Skip to content

Commit f2d0658

Browse files
committed
remove p_mode param from ensure_flow_compiled, auto-detect via is_local()
1 parent 5166c57 commit f2d0658

14 files changed

+145
-90
lines changed

pkgs/core/schemas/0100_function_ensure_flow_compiled.sql

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
-- Ensure a flow is compiled in the database
2-
-- Handles both development (auto-recompile) and production (fail on mismatch) modes
2+
-- Auto-detects environment via is_local(): local -> auto-recompile, production -> fail on mismatch
33
-- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] }
44
create or replace function pgflow.ensure_flow_compiled(
5-
p_flow_slug text,
6-
p_shape jsonb,
7-
p_mode text default 'production' -- 'development' | 'production'
5+
flow_slug text,
6+
shape jsonb
87
)
98
returns jsonb
109
language plpgsql
@@ -16,43 +15,46 @@ DECLARE
1615
v_flow_exists boolean;
1716
v_db_shape jsonb;
1817
v_differences text[];
18+
v_is_local boolean;
1919
BEGIN
2020
-- Generate lock key from flow_slug (deterministic hash)
21-
v_lock_key := hashtext(p_flow_slug);
21+
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);
2222

2323
-- Acquire transaction-level advisory lock
2424
-- Serializes concurrent compilation attempts for same flow
2525
PERFORM pg_advisory_xact_lock(1, v_lock_key);
2626

2727
-- 1. Check if flow exists
28-
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
28+
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
2929
INTO v_flow_exists;
3030

31-
-- 2. If flow missing: compile (both modes)
31+
-- 2. If flow missing: compile (both environments)
3232
IF NOT v_flow_exists THEN
33-
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
33+
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
3434
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
3535
END IF;
3636

3737
-- 3. Get current shape from DB
38-
v_db_shape := pgflow._get_flow_shape(p_flow_slug);
38+
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);
3939

4040
-- 4. Compare shapes
41-
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);
41+
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);
4242

4343
-- 5. If shapes match: return verified
4444
IF array_length(v_differences, 1) IS NULL THEN
4545
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
4646
END IF;
4747

48-
-- 6. Shapes differ - handle by mode
49-
IF p_mode = 'development' THEN
50-
-- Recompile in dev mode: full deletion + fresh compile
51-
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
52-
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
48+
-- 6. Shapes differ - auto-detect environment via is_local()
49+
v_is_local := pgflow.is_local();
50+
51+
IF v_is_local THEN
52+
-- Recompile in local/dev: full deletion + fresh compile
53+
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
54+
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
5355
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
5456
ELSE
55-
-- Fail in production mode
57+
-- Fail in production
5658
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
5759
END IF;
5860
END;

pkgs/core/src/database-types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ export type Database = {
449449
Returns: undefined
450450
}
451451
ensure_flow_compiled: {
452-
Args: { p_flow_slug: string; p_mode?: string; p_shape: Json }
452+
Args: { flow_slug: string; shape: Json }
453453
Returns: Json
454454
}
455455
fail_task: {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
-- Create "ensure_flow_compiled" function
2+
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("flow_slug" text, "shape" jsonb) RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
3+
DECLARE
4+
v_lock_key int;
5+
v_flow_exists boolean;
6+
v_db_shape jsonb;
7+
v_differences text[];
8+
v_is_local boolean;
9+
BEGIN
10+
-- Generate lock key from flow_slug (deterministic hash)
11+
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);
12+
13+
-- Acquire transaction-level advisory lock
14+
-- Serializes concurrent compilation attempts for same flow
15+
PERFORM pg_advisory_xact_lock(1, v_lock_key);
16+
17+
-- 1. Check if flow exists
18+
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
19+
INTO v_flow_exists;
20+
21+
-- 2. If flow missing: compile (both environments)
22+
IF NOT v_flow_exists THEN
23+
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
24+
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
25+
END IF;
26+
27+
-- 3. Get current shape from DB
28+
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);
29+
30+
-- 4. Compare shapes
31+
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);
32+
33+
-- 5. If shapes match: return verified
34+
IF array_length(v_differences, 1) IS NULL THEN
35+
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
36+
END IF;
37+
38+
-- 6. Shapes differ - auto-detect environment via is_local()
39+
v_is_local := pgflow.is_local();
40+
41+
IF v_is_local THEN
42+
-- Recompile in local/dev: full deletion + fresh compile
43+
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
44+
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
45+
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
46+
ELSE
47+
-- Fail in production
48+
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
49+
END IF;
50+
END;
51+
$$;
52+
-- Drop "ensure_flow_compiled" function
53+
DROP FUNCTION "pgflow"."ensure_flow_compiled" (text, jsonb, text);

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:RyA/4WAtZP28l0qVRPF/DF2ly/POIsQa6ZTCzP/f/NA=
1+
h1:uE4Gy3n0TTiHs83gNYMJ63aaTl7OFO5tGkFZy6a1xD0=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -17,3 +17,4 @@ h1:RyA/4WAtZP28l0qVRPF/DF2ly/POIsQa6ZTCzP/f/NA=
1717
20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0=
1818
20251201105311_pgflow_temp_advisory_lock_for_compilation.sql h1:OmRtiaPYjPuq9P87Px2PH06gdKhHZ0Ro6GfjjS0G+Rs=
1919
20251204115929_pgflow_temp_is_local.sql h1:pjOFO6k8FCmbxp6S7U3fPImsqW81WwdLwq/UZK74BG4=
20+
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:VwqZiOcVaCahb6BZ918ioFLgwQcF/sy1TR9a4lSnVvs=
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
begin;
2+
select plan(3);
3+
select pgflow_tests.reset_db();
4+
5+
-- Setup: Simulate local environment
6+
select set_config('app.settings.jwt_secret', 'super-secret-jwt-token-with-at-least-32-characters-long', true);
7+
8+
-- Setup: Create flow with different shape
9+
select pgflow.create_flow('local_flow');
10+
select pgflow.add_step('local_flow', 'old_step');
11+
12+
-- Test: Different shape should auto-recompile when is_local()=true (no p_mode param)
13+
select is(
14+
(
15+
select result->>'status'
16+
from pgflow.ensure_flow_compiled(
17+
'local_flow',
18+
'{
19+
"steps": [
20+
{"slug": "new_step", "stepType": "single", "dependencies": []}
21+
]
22+
}'::jsonb
23+
) as result
24+
),
25+
'recompiled',
26+
'Should auto-recompile when is_local()=true'
27+
);
28+
29+
-- Verify old step is gone
30+
select is(
31+
(select count(*)::int from pgflow.steps where flow_slug = 'local_flow' and step_slug = 'old_step'),
32+
0,
33+
'Old step should be deleted'
34+
);
35+
36+
-- Verify new step exists
37+
select is(
38+
(select count(*)::int from pgflow.steps where flow_slug = 'local_flow' and step_slug = 'new_step'),
39+
1,
40+
'New step should be created'
41+
);
42+
43+
select finish();
44+
rollback;

pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql renamed to pkgs/core/supabase/tests/ensure_flow_compiled/fails_mismatch_when_not_local.test.sql

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,49 @@ begin;
22
select plan(3);
33
select pgflow_tests.reset_db();
44

5+
-- Setup: Simulate production environment (not local)
6+
select set_config('app.settings.jwt_secret', 'production-jwt-secret-that-differs-from-local', true);
7+
58
-- Setup: Create flow with different shape
6-
select pgflow.create_flow('prod_flow');
7-
select pgflow.add_step('prod_flow', 'old_step');
9+
select pgflow.create_flow('prod_flow_auto');
10+
select pgflow.add_step('prod_flow_auto', 'old_step');
811

9-
-- Test: Different shape in production mode should return mismatch
12+
-- Test: Different shape should return mismatch when is_local()=false (no p_mode param)
1013
select is(
1114
(
1215
select result->>'status'
1316
from pgflow.ensure_flow_compiled(
14-
'prod_flow',
17+
'prod_flow_auto',
1518
'{
1619
"steps": [
1720
{"slug": "new_step", "stepType": "single", "dependencies": []}
1821
]
19-
}'::jsonb,
20-
'production'
22+
}'::jsonb
2123
) as result
2224
),
2325
'mismatch',
24-
'Should return mismatch status in production mode'
26+
'Should return mismatch status when is_local()=false'
2527
);
2628

2729
-- Verify differences are returned
2830
select ok(
2931
(
3032
select jsonb_array_length(result->'differences') > 0
3133
from pgflow.ensure_flow_compiled(
32-
'prod_flow',
34+
'prod_flow_auto',
3335
'{
3436
"steps": [
3537
{"slug": "new_step", "stepType": "single", "dependencies": []}
3638
]
37-
}'::jsonb,
38-
'production'
39+
}'::jsonb
3940
) as result
4041
),
4142
'Should return differences for production mismatch'
4243
);
4344

4445
-- Verify database was NOT modified
4546
select is(
46-
(select step_slug from pgflow.steps where flow_slug = 'prod_flow'),
47+
(select step_slug from pgflow.steps where flow_slug = 'prod_flow_auto'),
4748
'old_step',
4849
'Database should not be modified on production mismatch'
4950
);

pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql

Lines changed: 0 additions & 42 deletions
This file was deleted.

pkgs/edge-worker/scripts/concatenate-migrations.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,8 @@ echo "-- From file: seed.sql" >> "$target_file"
3434
cat "../core/supabase/seed.sql" >> "$target_file"
3535
echo "" >> "$target_file"
3636
echo "" >> "$target_file"
37+
38+
# Configure local JWT secret for is_local() detection in tests
39+
echo "-- Configure local JWT secret for is_local() detection" >> "$target_file"
40+
echo "ALTER DATABASE postgres SET app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';" >> "$target_file"
41+
echo "" >> "$target_file"

pkgs/edge-worker/src/core/Queries.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ export class Queries {
5454

5555
async ensureFlowCompiled(
5656
flowSlug: string,
57-
shape: FlowShape,
58-
mode: 'development' | 'production'
57+
shape: FlowShape
5958
): Promise<EnsureFlowCompiledResult> {
6059
// SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers,
6160
// arrays, and plain objects), but TypeScript can't prove this because FlowShape
@@ -69,8 +68,7 @@ export class Queries {
6968
const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>`
7069
SELECT pgflow.ensure_flow_compiled(
7170
${flowSlug},
72-
${shapeJson}::jsonb,
73-
${mode}
71+
${shapeJson}::jsonb
7472
) as result
7573
`;
7674
return result.result;

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { FlowShapeMismatchError } from './errors.js';
88

99
export interface FlowLifecycleConfig {
1010
heartbeatInterval?: number;
11-
isLocalEnvironment?: boolean;
1211
ensureCompiledOnStartup?: boolean;
1312
}
1413

@@ -25,7 +24,6 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2524
private _workerId?: string;
2625
private heartbeatInterval: number;
2726
private lastHeartbeat = 0;
28-
private isLocalEnvironment: boolean;
2927
private ensureCompiledOnStartup: boolean;
3028

3129
constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
@@ -34,7 +32,6 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3432
this.logger = logger;
3533
this.workerState = new WorkerState(logger);
3634
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
37-
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
3835
this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true;
3936
}
4037

@@ -61,15 +58,13 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
6158
}
6259

6360
private async ensureFlowCompiled(): Promise<void> {
64-
const mode = this.isLocalEnvironment ? 'development' : 'production';
65-
this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`);
61+
this.logger.info(`Compiling flow '${this.flow.slug}'...`);
6662

6763
const shape = extractFlowShape(this.flow);
6864

6965
const result = await this.queries.ensureFlowCompiled(
7066
this.flow.slug,
71-
shape,
72-
mode
67+
shape
7368
);
7469

7570
if (result.status === 'mismatch') {

0 commit comments

Comments
 (0)