Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions packages/jobs/src/registry/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,7 @@ export function toRuntimeRegistry<
string,
StoredContinuousDefinition
>,
debounce: registry.debounce as Record<
string,
StoredDebounceDefinition
>,
debounce: registry.debounce as Record<string, StoredDebounceDefinition>,
workerPool: registry.workerPool as Record<
string,
StoredWorkerPoolDefinition
Expand Down
171 changes: 57 additions & 114 deletions packages/jobs/test/handlers/continuous.test.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
// packages/jobs/test/handlers/continuous.test.ts

import { describe, it, expect, beforeEach } from "vitest";
import { Effect, Layer, Schema, Duration } from "effect";
import { createTestRuntime, NoopTrackerLayer } from "@durable-effect/core";
import {
ContinuousHandler,
ContinuousHandlerLayer,
type ContinuousResponse,
} from "../../src/handlers/continuous";
import { MetadataService, MetadataServiceLayer } from "../../src/services/metadata";
import { AlarmService, AlarmServiceLayer } from "../../src/services/alarm";
import { RegistryService, RegistryServiceLayer } from "../../src/services/registry";
import { JobExecutionServiceLayer } from "../../src/services/execution";
import { CleanupServiceLayer } from "../../src/services/cleanup";
import { RetryExecutorLayer } from "../../src/retry";
import { Effect, Schema, Duration } from "effect";
import { ContinuousHandler } from "../../src/handlers/continuous";
import { Continuous } from "../../src/definitions/continuous";
import type { RuntimeJobRegistry } from "../../src/registry/typed";
import {
createTestRegistry,
createContinuousTestLayer,
runWithLayer,
runExitWithLayer,
} from "./test-utils";

// =============================================================================
// Test Fixtures
Expand Down Expand Up @@ -98,86 +92,15 @@ const terminatingPrimitive = Continuous.make({
}),
});

// Create test registry using object types (RuntimeJobRegistry)
const createTestRegistry = (): RuntimeJobRegistry => ({
continuous: {
"counter": { ...counterPrimitive, name: "counter" },
"failing": { ...failingPrimitive, name: "failing" },
"no-immediate": { ...noImmediateStartPrimitive, name: "no-immediate" },
"terminating": { ...terminatingPrimitive, name: "terminating" },
} as Record<string, any>,
debounce: {} as Record<string, any>,
workerPool: {} as Record<string, any>,
task: {} as Record<string, any>,
});

// =============================================================================
// Test Helpers
// =============================================================================

// Helper to run Effect with layer, bypassing strict R parameter checking
// This is needed because the test registry uses `as Record<string, any>` which
// causes `any` to leak into Effect R parameters
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const runWithLayer = <A, E>(
effect: Effect.Effect<A, E, any>,
layer: Layer.Layer<ContinuousHandler>
): Promise<A> =>
Effect.runPromise(
effect.pipe(Effect.provide(layer)) as Effect.Effect<A, E, never>
);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const runExitWithLayer = <A, E>(
effect: Effect.Effect<A, E, any>,
layer: Layer.Layer<ContinuousHandler>
) =>
Effect.runPromiseExit(
effect.pipe(Effect.provide(layer)) as Effect.Effect<A, E, never>
);

// =============================================================================
// Test Setup
// =============================================================================

const createTestLayer = (initialTime = 1000000) => {
const { layer: coreLayer, time, handles } = createTestRuntime("test-instance", initialTime);
const registry = createTestRegistry();

const servicesLayer = Layer.mergeAll(
MetadataServiceLayer,
AlarmServiceLayer
).pipe(
Layer.provideMerge(NoopTrackerLayer),
Layer.provideMerge(coreLayer)
);

// RetryExecutor depends on AlarmService from servicesLayer
const retryLayer = RetryExecutorLayer.pipe(
Layer.provideMerge(servicesLayer)
);

// CleanupService depends on AlarmService and StorageAdapter
const cleanupLayer = CleanupServiceLayer.pipe(
Layer.provideMerge(servicesLayer)
);

// JobExecutionService depends on RetryExecutor, CleanupService, and core adapters
const executionLayer = JobExecutionServiceLayer.pipe(
Layer.provideMerge(retryLayer),
Layer.provideMerge(cleanupLayer),
Layer.provideMerge(coreLayer)
);

const handlerLayer = ContinuousHandlerLayer.pipe(
Layer.provideMerge(RegistryServiceLayer(registry)),
Layer.provideMerge(servicesLayer),
Layer.provideMerge(retryLayer),
Layer.provideMerge(executionLayer)
) as Layer.Layer<ContinuousHandler>;

return { layer: handlerLayer, time, handles, coreLayer };
};
const createRegistry = () =>
createTestRegistry({
continuous: {
counter: { ...counterPrimitive, name: "counter" },
failing: { ...failingPrimitive, name: "failing" },
"no-immediate": { ...noImmediateStartPrimitive, name: "no-immediate" },
terminating: { ...terminatingPrimitive, name: "terminating" },
},
});

// =============================================================================
// Tests
Expand All @@ -192,7 +115,8 @@ describe("ContinuousHandler", () => {

describe("start action", () => {
it("creates a new instance and executes immediately by default", async () => {
const { layer } = createTestLayer(1000000);
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry, 1000000);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -222,7 +146,8 @@ describe("ContinuousHandler", () => {
});

it("returns existing instance if already started", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -258,7 +183,8 @@ describe("ContinuousHandler", () => {
});

it("respects startImmediately: false", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand All @@ -284,7 +210,8 @@ describe("ContinuousHandler", () => {

describe("terminate action", () => {
it("terminates a running instance and deletes all storage", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -330,7 +257,8 @@ describe("ContinuousHandler", () => {
});

it("returns not_found for non-existent instance", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand All @@ -352,7 +280,8 @@ describe("ContinuousHandler", () => {
});

it("returns not_found when called twice (since first terminate deletes all storage)", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -396,7 +325,8 @@ describe("ContinuousHandler", () => {

describe("trigger action", () => {
it("triggers immediate execution", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -431,7 +361,8 @@ describe("ContinuousHandler", () => {
});

it("returns triggered: false for non-existent instance", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand All @@ -451,7 +382,8 @@ describe("ContinuousHandler", () => {
});

it("returns triggered: false for terminated instance", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -490,7 +422,8 @@ describe("ContinuousHandler", () => {

describe("status action", () => {
it("returns status for running instance", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -520,7 +453,8 @@ describe("ContinuousHandler", () => {
});

it("returns not_found for non-existent instance", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand All @@ -542,7 +476,8 @@ describe("ContinuousHandler", () => {

describe("getState action", () => {
it("returns current state", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -574,7 +509,8 @@ describe("ContinuousHandler", () => {

describe("handleAlarm", () => {
it("executes on alarm and schedules next", async () => {
const { layer, time, handles } = createTestLayer(1000000);
const registry = createRegistry();
const { layer, time, handles } = createContinuousTestLayer(registry, 1000000);

await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -609,7 +545,8 @@ describe("ContinuousHandler", () => {
});

it("does nothing when terminated", async () => {
const { layer, time } = createTestLayer();
const registry = createRegistry();
const { layer, time } = createContinuousTestLayer(registry);

await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -646,7 +583,8 @@ describe("ContinuousHandler", () => {

describe("error handling", () => {
it("fails with ExecutionError when execute fails without retry config", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const resultExit = await runExitWithLayer(
Effect.gen(function* () {
Expand All @@ -670,7 +608,8 @@ describe("ContinuousHandler", () => {

describe("ctx.terminate", () => {
it("terminates on first run when condition met (purges state by default)", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -713,7 +652,8 @@ describe("ContinuousHandler", () => {
});

it("terminates during alarm and stops further alarms", async () => {
const { layer, time, handles } = createTestLayer(1000000);
const registry = createRegistry();
const { layer, time, handles } = createContinuousTestLayer(registry, 1000000);

await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -763,7 +703,8 @@ describe("ContinuousHandler", () => {
});

it("trigger action returns terminated: true when terminate called", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -797,7 +738,8 @@ describe("ContinuousHandler", () => {
});

it("trigger action returns triggered: false for terminated instance", async () => {
const { layer } = createTestLayer();
const registry = createRegistry();
const { layer } = createContinuousTestLayer(registry);

const result = await runWithLayer(
Effect.gen(function* () {
Expand Down Expand Up @@ -830,7 +772,8 @@ describe("ContinuousHandler", () => {
});

it("handleAlarm does nothing for terminated instance", async () => {
const { layer, time } = createTestLayer();
const registry = createRegistry();
const { layer, time } = createContinuousTestLayer(registry);

await runWithLayer(
Effect.gen(function* () {
Expand Down
Loading