Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/effect-worker-v2/src/jobs/basic-debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const debounceExample = Debounce.make({

// Flush after 5 seconds of inactivity
flushAfter: "5 seconds",
logging: true,

// Or flush immediately when 10 events accumulated
maxEvents: 10,
Expand Down
1 change: 1 addition & 0 deletions examples/effect-worker-v2/src/jobs/basic-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const basicTask = Task.make({
eventSchema: TaskEvent,
stateSchema: TaskState,

logging: true,
onEvent: (event, ctx) =>
Effect.gen(function* () {
yield* Effect.log("Handling event");
Expand Down
5 changes: 3 additions & 2 deletions examples/effect-worker-v2/src/jobs/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export type HeartbeatState = typeof HeartbeatState.Type;
*/
export const heartbeat = Continuous.make({
stateSchema: HeartbeatState,

logging: true,
// Run every 10 seconds
schedule: Continuous.every("4 seconds"),

Expand All @@ -56,6 +56,7 @@ export const heartbeat = Continuous.make({
Effect.gen(function* () {
// Get current state (Effect-based)
const currentState = yield* ctx.state;
const runCount = ctx.runCount;

yield* Effect.log(
`Heartbeat #${ctx.runCount}: ${currentState.name} - count=${currentState.count}`,
Expand All @@ -70,7 +71,7 @@ export const heartbeat = Continuous.make({
}));

// Example: auto-terminate after 10 heartbeats
if (currentState.count >= 2000) {
if (runCount >= 20) {
yield* Effect.log(
`Heartbeat ${currentState.name} reached max count, terminating`,
);
Expand Down
4 changes: 0 additions & 4 deletions packages/core/src/tracker/tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ export const emitEvent = <E extends BaseTrackingEvent>(
): Effect.Effect<void> =>
Effect.flatMap(Effect.serviceOption(EventTracker), (option) => {
if (option._tag === "Some") {
console.log(`[Tracker] emitEvent: type=${event.type}, eventId=${event.eventId}`);
return option.value.emit(event);
} else {
console.log(`[Tracker] emitEvent: tracker not available, event type=${event.type} dropped`);
return Effect.void;
}
});
Expand All @@ -91,10 +89,8 @@ export const flushEvents: Effect.Effect<void> = Effect.flatMap(
Effect.serviceOption(EventTracker),
(option) => {
if (option._tag === "Some") {
console.log("[Tracker] flushEvents: flushing...");
return option.value.flush();
} else {
console.log("[Tracker] flushEvents: tracker not available, nothing to flush");
return Effect.void;
}
},
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@durable-effect/jobs",
"version": "0.0.1-next.3",
"version": "0.0.1-next.4",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
24 changes: 23 additions & 1 deletion packages/jobs/src/definitions/continuous.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
UnregisteredContinuousDefinition,
ContinuousSchedule,
ContinuousContext,
LoggingOption,
} from "../registry/types";
import type { JobRetryConfig } from "../retry/types";

Expand Down Expand Up @@ -49,11 +50,31 @@ export interface ContinuousMakeConfig<S, E, R> {
*/
readonly retry?: JobRetryConfig;

/**
* Control logging for this job.
*
* - `false` (default): Only log errors (LogLevel.Error)
* - `true`: Enable all logs (LogLevel.Debug)
* - `LogLevel.*`: Use a specific log level
* - `LogLevel.None`: Suppress all logs
*
* @example
* ```ts
* import { LogLevel } from "effect";
*
* // Enable debug logging
* logging: true,
*
* // Only warnings and above
* logging: LogLevel.Warning,
* ```
*/
readonly logging?: LoggingOption;

/**
* The function to execute on schedule.
*/
execute(ctx: ContinuousContext<S>): Effect.Effect<void, E, R>;

}

/**
Expand Down Expand Up @@ -103,6 +124,7 @@ export const Continuous = {
schedule: config.schedule,
startImmediately: config.startImmediately,
retry: config.retry,
logging: config.logging,
execute: config.execute,
}),

Expand Down
23 changes: 23 additions & 0 deletions packages/jobs/src/definitions/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
UnregisteredDebounceDefinition,
DebounceEventContext,
DebounceExecuteContext,
LoggingOption,
} from "../registry/types";
import type { JobRetryConfig } from "../retry/types";

Expand Down Expand Up @@ -52,6 +53,27 @@ export interface DebounceMakeConfig<I, S, E, R> {
*/
readonly retry?: JobRetryConfig;

/**
* Control logging for this job.
*
* - `false` (default): Only log errors (LogLevel.Error)
* - `true`: Enable all logs (LogLevel.Debug)
* - `LogLevel.*`: Use a specific log level
* - `LogLevel.None`: Suppress all logs
*
* @example
* ```ts
* import { LogLevel } from "effect";
*
* // Enable debug logging
* logging: true,
*
* // Only warnings and above
* logging: LogLevel.Warning,
* ```
*/
readonly logging?: LoggingOption;

/**
* Reducer for each incoming event. Defaults to returning the latest event.
*/
Expand Down Expand Up @@ -90,6 +112,7 @@ export const Debounce = {
flushAfter: config.flushAfter,
maxEvents: config.maxEvents,
retry: config.retry,
logging: config.logging,
onEvent:
config.onEvent ??
((ctx: DebounceEventContext<I, S>) =>
Expand Down
23 changes: 23 additions & 0 deletions packages/jobs/src/definitions/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
TaskExecuteContext,
TaskIdleContext,
TaskErrorContext,
LoggingOption,
} from "../registry/types";

// =============================================================================
Expand Down Expand Up @@ -151,6 +152,27 @@ export interface TaskMakeConfig<S, E, Err, R> {
error: Err,
ctx: TaskErrorContext<S>
) => Effect.Effect<void, never, R>;

/**
* Control logging for this job.
*
* - `false` (default): Only log errors (LogLevel.Error)
* - `true`: Enable all logs (LogLevel.Debug)
* - `LogLevel.*`: Use a specific log level
* - `LogLevel.None`: Suppress all logs
*
* @example
* ```ts
* import { LogLevel } from "effect";
*
* // Enable debug logging
* logging: true,
*
* // Only warnings and above
* logging: LogLevel.Warning,
* ```
*/
readonly logging?: LoggingOption;
}

/**
Expand Down Expand Up @@ -257,6 +279,7 @@ export const Task = {
execute: config.execute,
onIdle: config.onIdle,
onError: config.onError,
logging: config.logging,
}),
} as const;

Expand Down
97 changes: 58 additions & 39 deletions packages/jobs/src/handlers/continuous/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
type JobError,
} from "../../errors";
import { RetryExecutor } from "../../retry";
import { withJobLogging } from "../../services/job-logging";
import type { ContinuousRequest } from "../../runtime/types";
import type { ContinuousDefinition, ContinuousContext } from "../../registry/types";
import type { ContinuousHandlerI, ContinuousResponse } from "./types";
Expand Down Expand Up @@ -345,18 +346,27 @@ export const ContinuousHandlerLayer = Layer.effect(
Effect.gen(function* () {
const def = yield* getDefinition(request.name);

switch (request.action) {
case "start":
return yield* handleStart(def, request);
case "terminate":
return yield* handleTerminate(request);
case "trigger":
return yield* handleTrigger(def);
case "status":
return yield* handleStatus();
case "getState":
return yield* handleGetState(def);
}
const handlerEffect = Effect.gen(function* () {
switch (request.action) {
case "start":
return yield* handleStart(def, request);
case "terminate":
return yield* handleTerminate(request);
case "trigger":
return yield* handleTrigger(def);
case "status":
return yield* handleStatus();
case "getState":
return yield* handleGetState(def);
}
});

return yield* withJobLogging(handlerEffect, {
logging: def.logging,
jobType: "continuous",
jobName: def.name,
instanceId: runtime.instanceId,
});
}).pipe(
Effect.catchTag("StorageError", (e) =>
Effect.fail(
Expand Down Expand Up @@ -393,33 +403,42 @@ export const ContinuousHandlerLayer = Layer.effect(

const def = yield* getDefinition(meta.name);

const isRetrying = yield* retryExecutor.isRetrying().pipe(
Effect.catchAll(() => Effect.succeed(false)),
);

let runCount: number;
if (isRetrying) {
runCount = yield* getRunCount();
} else {
runCount = yield* incrementRunCount();
}

const result = yield* runExecution(def, runCount, meta.id);

if (result.retryScheduled) {
return;
}

if (result.terminated) {
// Terminated - CleanupService has already purged everything
return;
}

const currentMeta = yield* metadata.get();
if (currentMeta && currentMeta.status === "running") {
yield* updateLastExecutedAt();
yield* scheduleNext(def);
}
const alarmEffect = Effect.gen(function* () {
const isRetrying = yield* retryExecutor.isRetrying().pipe(
Effect.catchAll(() => Effect.succeed(false)),
);

let runCount: number;
if (isRetrying) {
runCount = yield* getRunCount();
} else {
runCount = yield* incrementRunCount();
}

const result = yield* runExecution(def, runCount, meta.id);

if (result.retryScheduled) {
return;
}

if (result.terminated) {
// Terminated - CleanupService has already purged everything
return;
}

const currentMeta = yield* metadata.get();
if (currentMeta && currentMeta.status === "running") {
yield* updateLastExecutedAt();
yield* scheduleNext(def);
}
});

yield* withJobLogging(alarmEffect, {
logging: def.logging,
jobType: "continuous",
jobName: def.name,
instanceId: runtime.instanceId,
});
}).pipe(
Effect.catchTag("StorageError", (e) =>
Effect.fail(
Expand Down
Loading