Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions pkgs/core/schemas/0059_function_ensure_workers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
-- Ensure Workers
-- Returns which worker functions should be invoked, makes HTTP requests, and updates last_invoked_at
-- Called by cron job to keep workers running

drop function if exists pgflow.ensure_workers();

create or replace function pgflow.ensure_workers()
returns table (function_name text, invoked boolean, request_id bigint)
language sql
as $$
with
-- Detect environment
env as (
select pgflow.is_local() as is_local
),

-- Get credentials: Vault secrets with local fallback for base_url only
credentials as (
select
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key,
coalesce(
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'),
case when (select is_local from env) then 'http://kong:8000/functions/v1' end
) as base_url
),

-- Find functions that pass the debounce check
debounce_passed as (
select wf.function_name, wf.heartbeat_timeout_seconds
from pgflow.worker_functions as wf
where wf.enabled = true
and (
wf.last_invoked_at is null
or wf.last_invoked_at < now() - (wf.heartbeat_timeout_seconds || ' seconds')::interval
)
),

-- Find functions that have at least one alive worker
functions_with_alive_workers as (
select distinct w.function_name
from pgflow.workers as w
inner join debounce_passed as dp on w.function_name = dp.function_name
where w.stopped_at is null
and w.deprecated_at is null
and w.last_heartbeat_at > now() - (dp.heartbeat_timeout_seconds || ' seconds')::interval
),

-- Determine which functions should be invoked
functions_to_invoke as (
select dp.function_name
from debounce_passed as dp
where
pgflow.is_local() = true
or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
),

-- Make HTTP requests and capture request_ids
http_requests as (
select
fti.function_name,
net.http_post(
url => c.base_url || '/' || fti.function_name,
headers => case
when e.is_local then '{}'::jsonb
else jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || c.service_role_key
)
end,
body => '{}'::jsonb
) as request_id
from functions_to_invoke as fti
cross join credentials as c
cross join env as e
where c.base_url is not null
and (e.is_local or c.service_role_key is not null)
),

-- Update last_invoked_at for invoked functions
updated as (
update pgflow.worker_functions as wf
set last_invoked_at = clock_timestamp()
from http_requests as hr
where wf.function_name = hr.function_name
returning wf.function_name
)

select u.function_name, true as invoked, hr.request_id
from updated as u
inner join http_requests as hr on u.function_name = hr.function_name
$$;

comment on function pgflow.ensure_workers() is
'Ensures worker functions are running by pinging them via HTTP when needed.
In local mode: always pings all enabled functions (for fast restart after code changes).
In production mode: only pings functions that have no alive workers.
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks.
Returns request_id from pg_net for each HTTP request made.';
8 changes: 8 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,14 @@ export type Database = {
Args: { flow_slug: string; shape: Json }
Returns: Json
}
ensure_workers: {
Args: never
Returns: {
function_name: string
invoked: boolean
request_id: number
}[]
}
fail_task: {
Args: {
error_message: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
-- Create "ensure_workers" function
CREATE FUNCTION "pgflow"."ensure_workers" () RETURNS TABLE ("function_name" text, "invoked" boolean, "request_id" bigint) LANGUAGE sql AS $$
with
-- Detect environment
env as (
select pgflow.is_local() as is_local
),

-- Get credentials: Vault secrets with local fallback for base_url only
credentials as (
select
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key,
coalesce(
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'),
case when (select is_local from env) then 'http://kong:8000/functions/v1' end
) as base_url
),

-- Find functions that pass the debounce check
debounce_passed as (
select wf.function_name, wf.heartbeat_timeout_seconds
from pgflow.worker_functions as wf
where wf.enabled = true
and (
wf.last_invoked_at is null
or wf.last_invoked_at < now() - (wf.heartbeat_timeout_seconds || ' seconds')::interval
)
),

-- Find functions that have at least one alive worker
functions_with_alive_workers as (
select distinct w.function_name
from pgflow.workers as w
inner join debounce_passed as dp on w.function_name = dp.function_name
where w.stopped_at is null
and w.deprecated_at is null
and w.last_heartbeat_at > now() - (dp.heartbeat_timeout_seconds || ' seconds')::interval
),

-- Determine which functions should be invoked
functions_to_invoke as (
select dp.function_name
from debounce_passed as dp
where
pgflow.is_local() = true
or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
),

-- Make HTTP requests and capture request_ids
http_requests as (
select
fti.function_name,
net.http_post(
url => c.base_url || '/' || fti.function_name,
headers => case
when e.is_local then '{}'::jsonb
else jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || c.service_role_key
)
end,
body => '{}'::jsonb
) as request_id
from functions_to_invoke as fti
cross join credentials as c
cross join env as e
where c.base_url is not null
and (e.is_local or c.service_role_key is not null)
),

-- Update last_invoked_at for invoked functions
updated as (
update pgflow.worker_functions as wf
set last_invoked_at = clock_timestamp()
from http_requests as hr
where wf.function_name = hr.function_name
returning wf.function_name
)

select u.function_name, true as invoked, hr.request_id
from updated as u
inner join http_requests as hr on u.function_name = hr.function_name
$$;
-- Set comment to function: "ensure_workers"
COMMENT ON FUNCTION "pgflow"."ensure_workers" IS 'Ensures worker functions are running by pinging them via HTTP when needed.
In local mode: always pings all enabled functions (for fast restart after code changes).
In production mode: only pings functions that have no alive workers.
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks.
Returns request_id from pg_net for each HTTP request made.';
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:ITAzSq+m8k27LdS5wU7dFgdolSQl5pHgOD5VgpL3zyU=
h1:G7RvNGNjnwVtYZTuceBwoaHEXa9bd/xH6hC8x5MNxZk=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -18,3 +18,4 @@ h1:ITAzSq+m8k27LdS5wU7dFgdolSQl5pHgOD5VgpL3zyU=
20251204164612_pgflow_temp_track_worker_function.sql h1:3Ht8wUx3saKPo98osuTG/nxD/tKR48qe8jHNVwuu2lY=
20251204165231_pgflow_temp_mark_worker_stopped.sql h1:zI2FijK429oae4OpLbjU4eSaZtYhlsbvN3buWH9FKLw=
20251205103442_pgflow_temp_add_extensions.sql h1:IBHG1vBdXu8wDEJzqpJUFmuPhVaX0mAmDUkngLgdaMg=
20251205133446_pgflow_temp_ensure_workers.sql h1:EQzE75uaMSXeU1sdjO7MK1ipCwepxlWSVzlKegLpr48=
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- Test: ensure_workers() retrieves credentials from Vault
begin;
select plan(2);
select pgflow_tests.reset_db();

-- Setup: Create Vault secrets
select vault.create_secret(
'test-service-role-key-from-vault',
'pgflow_service_role_key'
);
select vault.create_secret(
'http://vault-configured-url.example.com/functions/v1',
'pgflow_function_base_url'
);

-- Setup: Register a worker function
select pgflow.track_worker_function('my-function');
update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

-- Simulate production mode (non-local jwt_secret)
set local app.settings.jwt_secret = 'production-secret-different-from-local';

-- TEST: In production mode WITH Vault secrets, function IS invoked
with result as (
select * from pgflow.ensure_workers()
)
select is(
(select count(*) from result),
1::bigint,
'Production mode with Vault secrets invokes functions'
);

-- TEST: request_id is returned (proves Vault credentials were used for HTTP call)
update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

with result as (
select * from pgflow.ensure_workers()
)
select ok(
(select request_id is not null from result limit 1),
'Vault credentials allow HTTP invocation in production mode'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Test: ensure_workers() uses local fallback credentials when Vault is empty
begin;
select plan(2);
select pgflow_tests.reset_db();

-- Ensure no Vault secrets exist
delete from vault.secrets where name in ('pgflow_service_role_key', 'pgflow_function_base_url');

-- Setup: Register a worker function
select pgflow.track_worker_function('my-function');
update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

-- Simulate local mode
set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';

-- TEST: In local mode without Vault secrets, function IS invoked (uses fallback)
with result as (
select * from pgflow.ensure_workers()
)
select is(
(select count(*) from result),
1::bigint,
'Local mode uses fallback credentials when Vault is empty'
);

-- TEST: request_id is returned (proves HTTP call was made)
update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

with result as (
select * from pgflow.ensure_workers()
)
select ok(
(select request_id is not null from result limit 1),
'Local fallback credentials allow HTTP invocation'
);

select finish();
rollback;
53 changes: 53 additions & 0 deletions pkgs/core/supabase/tests/ensure_workers/debounce.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- Test: ensure_workers() respects debounce window
begin;
select plan(4);
select pgflow_tests.reset_db();

-- Setup: Register a worker function with 6 second heartbeat timeout
select pgflow.track_worker_function('my-function');

-- TEST: Function with recent last_invoked_at is NOT returned (debounce active)
-- Note: track_worker_function sets last_invoked_at to now()
set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';
select is(
(select count(*) from pgflow.ensure_workers()),
0::bigint,
'Function with recent last_invoked_at is NOT returned (debounce active)'
);

-- TEST: Function with last_invoked_at beyond heartbeat_timeout IS returned
update pgflow.worker_functions
set last_invoked_at = now() - interval '7 seconds'
where function_name = 'my-function';

select is(
(select count(*) from pgflow.ensure_workers()),
1::bigint,
'Function with last_invoked_at beyond timeout IS returned'
);

-- TEST: Function with NULL last_invoked_at IS returned
update pgflow.worker_functions
set last_invoked_at = null
where function_name = 'my-function';

select is(
(select count(*) from pgflow.ensure_workers()),
1::bigint,
'Function with NULL last_invoked_at IS returned'
);

-- TEST: Debounce uses heartbeat_timeout_seconds from function config
update pgflow.worker_functions
set heartbeat_timeout_seconds = 3,
last_invoked_at = now() - interval '4 seconds'
where function_name = 'my-function';

select is(
(select count(*) from pgflow.ensure_workers()),
1::bigint,
'Debounce respects function-specific heartbeat_timeout_seconds'
);

select finish();
rollback;
Loading
Loading