diff --git a/pkgs/core/schemas/0059_function_ensure_workers.sql b/pkgs/core/schemas/0059_function_ensure_workers.sql new file mode 100644 index 000000000..ceb44a7b8 --- /dev/null +++ b/pkgs/core/schemas/0059_function_ensure_workers.sql @@ -0,0 +1,99 @@ +-- Ensure Workers +-- Returns which worker functions should be invoked, makes HTTP requests, and updates last_invoked_at +-- Called by cron job to keep workers running + +drop function if exists pgflow.ensure_workers(); + +create or replace function pgflow.ensure_workers() +returns table (function_name text, invoked boolean, request_id bigint) +language sql +as $$ + with + -- Detect environment + env as ( + select pgflow.is_local() as is_local + ), + + -- Get credentials: Vault secrets with local fallback for base_url only + credentials as ( + select + (select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key, + coalesce( + (select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'), + case when (select is_local from env) then 'http://kong:8000/functions/v1' end + ) as base_url + ), + + -- Find functions that pass the debounce check + debounce_passed as ( + select wf.function_name, wf.heartbeat_timeout_seconds + from pgflow.worker_functions as wf + where wf.enabled = true + and ( + wf.last_invoked_at is null + or wf.last_invoked_at < now() - (wf.heartbeat_timeout_seconds || ' seconds')::interval + ) + ), + + -- Find functions that have at least one alive worker + functions_with_alive_workers as ( + select distinct w.function_name + from pgflow.workers as w + inner join debounce_passed as dp on w.function_name = dp.function_name + where w.stopped_at is null + and w.deprecated_at is null + and w.last_heartbeat_at > now() - (dp.heartbeat_timeout_seconds || ' seconds')::interval + ), + + -- Determine which functions should be invoked + functions_to_invoke as ( + select dp.function_name + from debounce_passed as dp + where + pgflow.is_local() = true + or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw) + ), + + -- Make HTTP requests and capture request_ids + http_requests as ( + select + fti.function_name, + net.http_post( + url => c.base_url || '/' || fti.function_name, + headers => case + when e.is_local then '{}'::jsonb + else jsonb_build_object( + 'Content-Type', 'application/json', + 'Authorization', 'Bearer ' || c.service_role_key + ) + end, + body => '{}'::jsonb + ) as request_id + from functions_to_invoke as fti + cross join credentials as c + cross join env as e + where c.base_url is not null + and (e.is_local or c.service_role_key is not null) + ), + + -- Update last_invoked_at for invoked functions + updated as ( + update pgflow.worker_functions as wf + set last_invoked_at = clock_timestamp() + from http_requests as hr + where wf.function_name = hr.function_name + returning wf.function_name + ) + + select u.function_name, true as invoked, hr.request_id + from updated as u + inner join http_requests as hr on u.function_name = hr.function_name +$$; + +comment on function pgflow.ensure_workers() is +'Ensures worker functions are running by pinging them via HTTP when needed. +In local mode: always pings all enabled functions (for fast restart after code changes). +In production mode: only pings functions that have no alive workers. +Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window. +Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks. +Returns request_id from pg_net for each HTTP request made.'; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 5ea84c94c..b235e0187 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -482,6 +482,14 @@ export type Database = { Args: { flow_slug: string; shape: Json } Returns: Json } + ensure_workers: { + Args: never + Returns: { + function_name: string + invoked: boolean + request_id: number + }[] + } fail_task: { Args: { error_message: string diff --git a/pkgs/core/supabase/migrations/20251205133446_pgflow_temp_ensure_workers.sql b/pkgs/core/supabase/migrations/20251205133446_pgflow_temp_ensure_workers.sql new file mode 100644 index 000000000..62301446d --- /dev/null +++ b/pkgs/core/supabase/migrations/20251205133446_pgflow_temp_ensure_workers.sql @@ -0,0 +1,90 @@ +-- Create "ensure_workers" function +CREATE FUNCTION "pgflow"."ensure_workers" () RETURNS TABLE ("function_name" text, "invoked" boolean, "request_id" bigint) LANGUAGE sql AS $$ +with + -- Detect environment + env as ( + select pgflow.is_local() as is_local + ), + + -- Get credentials: Vault secrets with local fallback for base_url only + credentials as ( + select + (select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key, + coalesce( + (select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'), + case when (select is_local from env) then 'http://kong:8000/functions/v1' end + ) as base_url + ), + + -- Find functions that pass the debounce check + debounce_passed as ( + select wf.function_name, wf.heartbeat_timeout_seconds + from pgflow.worker_functions as wf + where wf.enabled = true + and ( + wf.last_invoked_at is null + or wf.last_invoked_at < now() - (wf.heartbeat_timeout_seconds || ' seconds')::interval + ) + ), + + -- Find functions that have at least one alive worker + functions_with_alive_workers as ( + select distinct w.function_name + from pgflow.workers as w + inner join debounce_passed as dp on w.function_name = dp.function_name + where w.stopped_at is null + and w.deprecated_at is null + and w.last_heartbeat_at > now() - (dp.heartbeat_timeout_seconds || ' seconds')::interval + ), + + -- Determine which functions should be invoked + functions_to_invoke as ( + select dp.function_name + from debounce_passed as dp + where + pgflow.is_local() = true + or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw) + ), + + -- Make HTTP requests and capture request_ids + http_requests as ( + select + fti.function_name, + net.http_post( + url => c.base_url || '/' || fti.function_name, + headers => case + when e.is_local then '{}'::jsonb + else jsonb_build_object( + 'Content-Type', 'application/json', + 'Authorization', 'Bearer ' || c.service_role_key + ) + end, + body => '{}'::jsonb + ) as request_id + from functions_to_invoke as fti + cross join credentials as c + cross join env as e + where c.base_url is not null + and (e.is_local or c.service_role_key is not null) + ), + + -- Update last_invoked_at for invoked functions + updated as ( + update pgflow.worker_functions as wf + set last_invoked_at = clock_timestamp() + from http_requests as hr + where wf.function_name = hr.function_name + returning wf.function_name + ) + + select u.function_name, true as invoked, hr.request_id + from updated as u + inner join http_requests as hr on u.function_name = hr.function_name +$$; +-- Set comment to function: "ensure_workers" +COMMENT ON FUNCTION "pgflow"."ensure_workers" IS 'Ensures worker functions are running by pinging them via HTTP when needed. +In local mode: always pings all enabled functions (for fast restart after code changes). +In production mode: only pings functions that have no alive workers. +Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window. +Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks. +Returns request_id from pg_net for each HTTP request made.'; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 49bd12ea4..e0cc253ea 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:ITAzSq+m8k27LdS5wU7dFgdolSQl5pHgOD5VgpL3zyU= +h1:G7RvNGNjnwVtYZTuceBwoaHEXa9bd/xH6hC8x5MNxZk= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,3 +18,4 @@ h1:ITAzSq+m8k27LdS5wU7dFgdolSQl5pHgOD5VgpL3zyU= 20251204164612_pgflow_temp_track_worker_function.sql h1:3Ht8wUx3saKPo98osuTG/nxD/tKR48qe8jHNVwuu2lY= 20251204165231_pgflow_temp_mark_worker_stopped.sql h1:zI2FijK429oae4OpLbjU4eSaZtYhlsbvN3buWH9FKLw= 20251205103442_pgflow_temp_add_extensions.sql h1:IBHG1vBdXu8wDEJzqpJUFmuPhVaX0mAmDUkngLgdaMg= +20251205133446_pgflow_temp_ensure_workers.sql h1:EQzE75uaMSXeU1sdjO7MK1ipCwepxlWSVzlKegLpr48= diff --git a/pkgs/core/supabase/tests/ensure_workers/credentials_from_vault.test.sql b/pkgs/core/supabase/tests/ensure_workers/credentials_from_vault.test.sql new file mode 100644 index 000000000..9fc63cde4 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/credentials_from_vault.test.sql @@ -0,0 +1,47 @@ +-- Test: ensure_workers() retrieves credentials from Vault +begin; +select plan(2); +select pgflow_tests.reset_db(); + +-- Setup: Create Vault secrets +select vault.create_secret( + 'test-service-role-key-from-vault', + 'pgflow_service_role_key' +); +select vault.create_secret( + 'http://vault-configured-url.example.com/functions/v1', + 'pgflow_function_base_url' +); + +-- Setup: Register a worker function +select pgflow.track_worker_function('my-function'); +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate production mode (non-local jwt_secret) +set local app.settings.jwt_secret = 'production-secret-different-from-local'; + +-- TEST: In production mode WITH Vault secrets, function IS invoked +with result as ( + select * from pgflow.ensure_workers() +) +select is( + (select count(*) from result), + 1::bigint, + 'Production mode with Vault secrets invokes functions' +); + +-- TEST: request_id is returned (proves Vault credentials were used for HTTP call) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +with result as ( + select * from pgflow.ensure_workers() +) +select ok( + (select request_id is not null from result limit 1), + 'Vault credentials allow HTTP invocation in production mode' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/credentials_local_fallback.test.sql b/pkgs/core/supabase/tests/ensure_workers/credentials_local_fallback.test.sql new file mode 100644 index 000000000..8c73911fb --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/credentials_local_fallback.test.sql @@ -0,0 +1,40 @@ +-- Test: ensure_workers() uses local fallback credentials when Vault is empty +begin; +select plan(2); +select pgflow_tests.reset_db(); + +-- Ensure no Vault secrets exist +delete from vault.secrets where name in ('pgflow_service_role_key', 'pgflow_function_base_url'); + +-- Setup: Register a worker function +select pgflow.track_worker_function('my-function'); +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate local mode +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +-- TEST: In local mode without Vault secrets, function IS invoked (uses fallback) +with result as ( + select * from pgflow.ensure_workers() +) +select is( + (select count(*) from result), + 1::bigint, + 'Local mode uses fallback credentials when Vault is empty' +); + +-- TEST: request_id is returned (proves HTTP call was made) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +with result as ( + select * from pgflow.ensure_workers() +) +select ok( + (select request_id is not null from result limit 1), + 'Local fallback credentials allow HTTP invocation' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/debounce.test.sql b/pkgs/core/supabase/tests/ensure_workers/debounce.test.sql new file mode 100644 index 000000000..488e83df0 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/debounce.test.sql @@ -0,0 +1,53 @@ +-- Test: ensure_workers() respects debounce window +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Setup: Register a worker function with 6 second heartbeat timeout +select pgflow.track_worker_function('my-function'); + +-- TEST: Function with recent last_invoked_at is NOT returned (debounce active) +-- Note: track_worker_function sets last_invoked_at to now() +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; +select is( + (select count(*) from pgflow.ensure_workers()), + 0::bigint, + 'Function with recent last_invoked_at is NOT returned (debounce active)' +); + +-- TEST: Function with last_invoked_at beyond heartbeat_timeout IS returned +update pgflow.worker_functions +set last_invoked_at = now() - interval '7 seconds' +where function_name = 'my-function'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Function with last_invoked_at beyond timeout IS returned' +); + +-- TEST: Function with NULL last_invoked_at IS returned +update pgflow.worker_functions +set last_invoked_at = null +where function_name = 'my-function'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Function with NULL last_invoked_at IS returned' +); + +-- TEST: Debounce uses heartbeat_timeout_seconds from function config +update pgflow.worker_functions +set heartbeat_timeout_seconds = 3, + last_invoked_at = now() - interval '4 seconds' +where function_name = 'my-function'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Debounce respects function-specific heartbeat_timeout_seconds' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/disabled_functions.test.sql b/pkgs/core/supabase/tests/ensure_workers/disabled_functions.test.sql new file mode 100644 index 000000000..a71bb0c74 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/disabled_functions.test.sql @@ -0,0 +1,43 @@ +-- Test: ensure_workers() skips disabled functions +begin; +select plan(2); +select pgflow_tests.reset_db(); + +-- Setup: Register worker functions +select pgflow.track_worker_function('enabled-function'); +select pgflow.track_worker_function('disabled-function'); + +-- Disable one function +update pgflow.worker_functions +set enabled = false +where function_name = 'disabled-function'; + +-- Set last_invoked_at to past (beyond debounce window) for both +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- TEST: Only enabled functions are returned +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +-- Use a CTE to capture results in one call (avoid debounce between assertions) +with results as ( + select * from pgflow.ensure_workers() +) +select is( + (select count(*) from results), + 1::bigint, + 'Only enabled functions are returned' +); + +-- Reset debounce for second test +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +select is( + (select count(*) from pgflow.ensure_workers() where ensure_workers.function_name = 'enabled-function'), + 1::bigint, + 'Enabled function is returned' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/http_request_queued.test.sql b/pkgs/core/supabase/tests/ensure_workers/http_request_queued.test.sql new file mode 100644 index 000000000..fe0cedc6a --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/http_request_queued.test.sql @@ -0,0 +1,77 @@ +-- Test: ensure_workers() queues HTTP request via pg_net +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Clear any existing HTTP requests +delete from net._http_response; + +-- Setup: Register a worker function +select pgflow.track_worker_function('my-function'); + +-- Set last_invoked_at to past (beyond debounce window) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate local mode (will use local fallback credentials) +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +-- Execute ensure_workers() and capture request_id +with result as ( + select * from pgflow.ensure_workers() +) +select is( + (select count(*) from result), + 1::bigint, + 'One function was invoked' +); + +-- Check that an HTTP request was queued +-- Note: pg_net queues requests but doesn't execute until transaction commits +-- We check the internal request table to verify the request was created + +-- TEST: Request was queued in net._http_response +-- Note: The request may be in a pending state in net's internal tables +-- For now, we verify the request_id returned is valid +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +with result as ( + select * from pgflow.ensure_workers() +) +select ok( + (select request_id is not null from result limit 1), + 'HTTP request was queued (request_id returned)' +); + +-- TEST: Multiple functions each get their own request +select pgflow.track_worker_function('function-two'); +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +with result as ( + select * from pgflow.ensure_workers() +) +select is( + (select count(distinct request_id) from result), + 2::bigint, + 'Each function gets its own request_id' +); + +-- TEST: request_ids are unique +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +with result as ( + select * from pgflow.ensure_workers() +), +ids as ( + select request_id from result +) +select ok( + (select count(*) = count(distinct request_id) from ids), + 'All request_ids are unique' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/local_mode_always_pings.test.sql b/pkgs/core/supabase/tests/ensure_workers/local_mode_always_pings.test.sql new file mode 100644 index 000000000..1b0144ab4 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/local_mode_always_pings.test.sql @@ -0,0 +1,52 @@ +-- Test: ensure_workers() in local mode always returns functions (ignores worker state) +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Register worker function +select pgflow.track_worker_function('my-function'); +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Setup: Create an alive worker +insert into pgflow.workers (worker_id, queue_name, function_name, started_at, last_heartbeat_at) +values ('11111111-1111-1111-1111-111111111111', 'test_queue', 'my-function', now(), now()); + +-- Simulate local mode +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +-- TEST: Local mode returns function even with alive worker +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Local mode returns function even with alive worker' +); + +-- TEST: Local mode returns function even when worker has recent heartbeat +update pgflow.workers +set last_heartbeat_at = now() +where worker_id = '11111111-1111-1111-1111-111111111111'; + +-- Reset last_invoked_at to past again (it was updated by previous call) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Local mode returns function even when worker has fresh heartbeat' +); + +-- TEST: Debounce still applies in local mode +-- Reset last_invoked_at to now (will be within debounce window) +update pgflow.worker_functions +set last_invoked_at = now(); + +select is( + (select count(*) from pgflow.ensure_workers()), + 0::bigint, + 'Debounce still applies in local mode' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/no_credentials_skips_invocation.test.sql b/pkgs/core/supabase/tests/ensure_workers/no_credentials_skips_invocation.test.sql new file mode 100644 index 000000000..1f647fc0e --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/no_credentials_skips_invocation.test.sql @@ -0,0 +1,37 @@ +-- Test: ensure_workers() safely skips invocation when no credentials available (production) +begin; +select plan(2); +select pgflow_tests.reset_db(); + +-- Ensure no Vault secrets exist +delete from vault.secrets where name in ('pgflow_service_role_key', 'pgflow_function_base_url'); + +-- Setup: Register a worker function +select pgflow.track_worker_function('my-function'); +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate production mode (non-local jwt_secret, NO Vault secrets) +set local app.settings.jwt_secret = 'production-secret-different-from-local'; + +-- TEST: In production mode WITHOUT Vault secrets, returns empty (safe failure) +with result as ( + select * from pgflow.ensure_workers() +) +select is( + (select count(*) from result), + 0::bigint, + 'Production mode without credentials returns empty (misconfigured - safe failure)' +); + +-- TEST: No HTTP requests were queued +-- Note: We check that last_invoked_at was NOT updated (since no invocation happened) +select ok( + (select last_invoked_at < now() - interval '5 seconds' + from pgflow.worker_functions + where function_name = 'my-function'), + 'last_invoked_at not updated when no credentials available' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/request_id_returned.test.sql b/pkgs/core/supabase/tests/ensure_workers/request_id_returned.test.sql new file mode 100644 index 000000000..bdc6eec96 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/request_id_returned.test.sql @@ -0,0 +1,52 @@ +-- Test: ensure_workers() returns request_id in result +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Register a worker function +select pgflow.track_worker_function('my-function'); + +-- Set last_invoked_at to past (beyond debounce window) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate local mode (will use local fallback credentials) +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +-- TEST: Result includes request_id column (verify by selecting it) +with result as ( + select function_name, invoked, request_id from pgflow.ensure_workers() +) +select ok( + (select count(*) = 1 from result where request_id is not null), + 'ensure_workers() returns request_id column' +); + +-- Reset debounce for next test +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- TEST: request_id is NOT NULL when function is invoked +with result as ( + select * from pgflow.ensure_workers() +) +select ok( + (select request_id is not null from result where function_name = 'my-function'), + 'request_id is NOT NULL when function is invoked' +); + +-- TEST: request_id is a valid bigint (positive number from pg_net) +-- Reset debounce for next call +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +with result as ( + select * from pgflow.ensure_workers() +) +select ok( + (select request_id > 0 from result where function_name = 'my-function'), + 'request_id is a positive bigint from pg_net' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/returns_functions_to_invoke.test.sql b/pkgs/core/supabase/tests/ensure_workers/returns_functions_to_invoke.test.sql new file mode 100644 index 000000000..462d5232c --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/returns_functions_to_invoke.test.sql @@ -0,0 +1,70 @@ +-- Test: ensure_workers() returns which functions should be invoked +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Setup: Create Vault secrets for production mode tests +select vault.create_secret('test-service-role-key', 'pgflow_service_role_key'); +select vault.create_secret('http://test.example.com/functions/v1', 'pgflow_function_base_url'); + +-- Setup: Register two worker functions +select pgflow.track_worker_function('function-a'); +select pgflow.track_worker_function('function-b'); + +-- Set last_invoked_at to past (beyond debounce window) for both +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- TEST: In local mode, all enabled functions are returned for invocation +-- (We mock is_local() by setting jwt_secret) +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 2::bigint, + 'In local mode, returns all enabled worker functions' +); + +-- Reset last_invoked_at (was updated by previous ensure_workers call) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate production mode by setting jwt_secret to a different value +set local app.settings.jwt_secret = 'production-secret-different-from-local'; + +-- TEST: In production mode with no workers, functions are returned for invocation +select is( + (select count(*) from pgflow.ensure_workers()), + 2::bigint, + 'In production mode with no workers, returns all functions' +); + +-- Reset last_invoked_at again +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Setup: Create an alive worker for function-a +insert into pgflow.workers (worker_id, queue_name, function_name, started_at, last_heartbeat_at) +values ('11111111-1111-1111-1111-111111111111', 'test_queue', 'function-a', now(), now()); + +-- TEST: In production mode, function with alive worker is NOT returned +select is( + (select count(*) from pgflow.ensure_workers() where ensure_workers.function_name = 'function-a'), + 0::bigint, + 'In production mode, function with alive worker is NOT returned' +); + +-- Reset last_invoked_at for function-b only (function-a shouldn't be returned anyway) +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds' +where function_name = 'function-b'; + +-- TEST: In production mode, function without alive worker IS returned +select is( + (select count(*) from pgflow.ensure_workers() where ensure_workers.function_name = 'function-b'), + 1::bigint, + 'In production mode, function without alive worker IS returned' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/updates_last_invoked_at.test.sql b/pkgs/core/supabase/tests/ensure_workers/updates_last_invoked_at.test.sql new file mode 100644 index 000000000..7761a1495 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/updates_last_invoked_at.test.sql @@ -0,0 +1,70 @@ +-- Test: ensure_workers() updates last_invoked_at for returned functions +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Register worker functions +select pgflow.track_worker_function('function-a'); +select pgflow.track_worker_function('function-b'); + +-- Set last_invoked_at to past for both +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Store original last_invoked_at values +select last_invoked_at as original_ts +into temporary original_timestamps +from pgflow.worker_functions +where function_name = 'function-a'; + +-- Small delay to ensure timestamp difference +select pg_sleep(0.01); + +-- Simulate local mode to ensure both functions are returned +set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long'; + +-- Execute ensure_workers() +select * from pgflow.ensure_workers(); + +-- TEST: last_invoked_at is updated for invoked functions +select ok( + (select last_invoked_at > (select original_ts from original_timestamps) + from pgflow.worker_functions + where function_name = 'function-a'), + 'last_invoked_at is updated after ensure_workers() returns function' +); + +-- TEST: last_invoked_at is recent (within last second) +select ok( + (select last_invoked_at > now() - interval '1 second' + from pgflow.worker_functions + where function_name = 'function-a'), + 'last_invoked_at is updated to approximately now' +); + +-- Setup: Disable function-b to ensure it is NOT returned +update pgflow.worker_functions +set enabled = false, last_invoked_at = now() - interval '10 seconds' +where function_name = 'function-b'; + +-- Store function-b's last_invoked_at +update original_timestamps +set original_ts = (select last_invoked_at from pgflow.worker_functions where function_name = 'function-b'); + +select pg_sleep(0.01); + +-- Re-run ensure_workers +select * from pgflow.ensure_workers(); + +-- TEST: last_invoked_at is NOT updated for functions that were NOT returned +select is( + (select last_invoked_at from pgflow.worker_functions where function_name = 'function-b'), + (select original_ts from original_timestamps), + 'last_invoked_at is NOT updated for disabled functions' +); + +-- Cleanup +drop table if exists original_timestamps; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_workers/worker_death_detection.test.sql b/pkgs/core/supabase/tests/ensure_workers/worker_death_detection.test.sql new file mode 100644 index 000000000..98246651a --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/worker_death_detection.test.sql @@ -0,0 +1,98 @@ +-- Test: ensure_workers() detects dead workers in production mode +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Setup: Create Vault secrets for production mode tests +select vault.create_secret('test-service-role-key', 'pgflow_service_role_key'); +select vault.create_secret('http://test.example.com/functions/v1', 'pgflow_function_base_url'); + +-- Setup: Register a worker function +select pgflow.track_worker_function('my-function'); +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- Simulate production mode by setting jwt_secret to a different value +set local app.settings.jwt_secret = 'production-secret-different-from-local'; + +-- Setup: Create a worker +insert into pgflow.workers (worker_id, queue_name, function_name, started_at, last_heartbeat_at) +values ('11111111-1111-1111-1111-111111111111', 'test_queue', 'my-function', now(), now()); + +-- TEST: Function with alive worker is NOT returned in production +select is( + (select count(*) from pgflow.ensure_workers()), + 0::bigint, + 'Function with alive worker is NOT returned in production' +); + +-- TEST: Function with stopped worker IS returned +update pgflow.workers +set stopped_at = now() +where worker_id = '11111111-1111-1111-1111-111111111111'; + +-- Reset debounce +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Function with stopped worker IS returned' +); + +-- Reset stopped_at, set deprecated_at instead +update pgflow.workers +set stopped_at = null, deprecated_at = now() +where worker_id = '11111111-1111-1111-1111-111111111111'; + +-- Reset debounce +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- TEST: Function with deprecated worker IS returned +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Function with deprecated worker IS returned' +); + +-- Reset deprecated_at, make heartbeat stale +update pgflow.workers +set deprecated_at = null, + last_heartbeat_at = now() - interval '10 seconds' +where worker_id = '11111111-1111-1111-1111-111111111111'; + +-- Reset debounce +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +-- TEST: Function with stale heartbeat worker IS returned +select is( + (select count(*) from pgflow.ensure_workers()), + 1::bigint, + 'Function with stale heartbeat worker IS returned' +); + +-- TEST: Mix of alive and dead workers - function with at least one alive is NOT returned +-- Reset first worker to dead +update pgflow.workers +set stopped_at = now() +where worker_id = '11111111-1111-1111-1111-111111111111'; + +-- Add an alive worker for same function +insert into pgflow.workers (worker_id, queue_name, function_name, started_at, last_heartbeat_at) +values ('22222222-2222-2222-2222-222222222222', 'test_queue', 'my-function', now(), now()); + +-- Reset debounce +update pgflow.worker_functions +set last_invoked_at = now() - interval '10 seconds'; + +select is( + (select count(*) from pgflow.ensure_workers()), + 0::bigint, + 'Function with at least one alive worker is NOT returned (even if other workers are dead)' +); + +select finish(); +rollback;