Skip to content

Commit d43f53b

Browse files
committed
add setup_ensure_workers_cron() SQL function for cron job scheduling
1 parent 19991aa commit d43f53b

9 files changed

+245
-1
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
-- Setup Ensure Workers Cron
2+
-- Sets up cron jobs for worker management: ensure_workers (configurable interval) and cleanup_logs (hourly)
3+
4+
drop function if exists pgflow.setup_ensure_workers_cron(text);
5+
6+
create or replace function pgflow.setup_ensure_workers_cron(
7+
cron_interval text default '1 second'
8+
)
9+
returns text
10+
language plpgsql
11+
security definer
12+
set search_path = pgflow, cron, pg_temp
13+
as $$
14+
declare
15+
ensure_workers_job_id bigint;
16+
cleanup_job_id bigint;
17+
begin
18+
-- Remove existing jobs if they exist (ignore errors if not found)
19+
begin
20+
perform cron.unschedule('pgflow_ensure_workers');
21+
exception when others then
22+
-- Job doesn't exist, continue
23+
end;
24+
25+
begin
26+
perform cron.unschedule('pgflow_cleanup_logs');
27+
exception when others then
28+
-- Job doesn't exist, continue
29+
end;
30+
31+
-- Schedule ensure_workers job with the specified interval
32+
ensure_workers_job_id := cron.schedule(
33+
job_name => 'pgflow_ensure_workers',
34+
schedule => setup_ensure_workers_cron.cron_interval,
35+
command => 'select pgflow.ensure_workers()'
36+
);
37+
38+
-- Schedule cleanup job to run hourly
39+
cleanup_job_id := cron.schedule(
40+
job_name => 'pgflow_cleanup_logs',
41+
schedule => '0 * * * *',
42+
command => 'select pgflow.cleanup_ensure_workers_logs()'
43+
);
44+
45+
return format(
46+
'Scheduled pgflow_ensure_workers (every %s, job_id=%s) and pgflow_cleanup_logs (hourly, job_id=%s)',
47+
setup_ensure_workers_cron.cron_interval,
48+
ensure_workers_job_id,
49+
cleanup_job_id
50+
);
51+
end;
52+
$$;
53+
54+
comment on function pgflow.setup_ensure_workers_cron(text) is
55+
'Sets up cron jobs for worker management.
56+
Schedules pgflow_ensure_workers at the specified cron_interval (default: 1 second) to keep workers running.
57+
Schedules pgflow_cleanup_logs hourly to clean up old cron job logs.
58+
Replaces existing jobs if they exist (idempotent).
59+
Returns a confirmation message with job IDs.';

pkgs/core/src/database-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,10 @@ export type Database = {
558558
vt: string
559559
}[]
560560
}
561+
setup_ensure_workers_cron: {
562+
Args: { cron_interval?: string }
563+
Returns: string
564+
}
561565
start_flow: {
562566
Args: { flow_slug: string; input: Json; run_id?: string }
563567
Returns: {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
-- Create "setup_ensure_workers_cron" function
2+
CREATE FUNCTION "pgflow"."setup_ensure_workers_cron" ("cron_interval" text DEFAULT '1 second') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$
3+
declare
4+
ensure_workers_job_id bigint;
5+
cleanup_job_id bigint;
6+
begin
7+
-- Remove existing jobs if they exist (ignore errors if not found)
8+
begin
9+
perform cron.unschedule('pgflow_ensure_workers');
10+
exception when others then
11+
-- Job doesn't exist, continue
12+
end;
13+
14+
begin
15+
perform cron.unschedule('pgflow_cleanup_logs');
16+
exception when others then
17+
-- Job doesn't exist, continue
18+
end;
19+
20+
-- Schedule ensure_workers job with the specified interval
21+
ensure_workers_job_id := cron.schedule(
22+
job_name => 'pgflow_ensure_workers',
23+
schedule => setup_ensure_workers_cron.cron_interval,
24+
command => 'select pgflow.ensure_workers()'
25+
);
26+
27+
-- Schedule cleanup job to run hourly
28+
cleanup_job_id := cron.schedule(
29+
job_name => 'pgflow_cleanup_logs',
30+
schedule => '0 * * * *',
31+
command => 'select pgflow.cleanup_ensure_workers_logs()'
32+
);
33+
34+
return format(
35+
'Scheduled pgflow_ensure_workers (every %s, job_id=%s) and pgflow_cleanup_logs (hourly, job_id=%s)',
36+
setup_ensure_workers_cron.cron_interval,
37+
ensure_workers_job_id,
38+
cleanup_job_id
39+
);
40+
end;
41+
$$;
42+
-- Set comment to function: "setup_ensure_workers_cron"
43+
COMMENT ON FUNCTION "pgflow"."setup_ensure_workers_cron" IS 'Sets up cron jobs for worker management.
44+
Schedules pgflow_ensure_workers at the specified cron_interval (default: 1 second) to keep workers running.
45+
Schedules pgflow_cleanup_logs hourly to clean up old cron job logs.
46+
Replaces existing jobs if they exist (idempotent).
47+
Returns a confirmation message with job IDs.';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:UPs/YAMNWH5FdnHa1GJRQZ4yiv6kOHpfBNfs6telwgI=
1+
h1:ptkKThTquu9rJzpsRUMDDHbe4OgnCEYGATLSRy0F1l0=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -20,3 +20,4 @@ h1:UPs/YAMNWH5FdnHa1GJRQZ4yiv6kOHpfBNfs6telwgI=
2020
20251205103442_pgflow_temp_add_extensions.sql h1:IBHG1vBdXu8wDEJzqpJUFmuPhVaX0mAmDUkngLgdaMg=
2121
20251205133446_pgflow_temp_ensure_workers.sql h1:EQzE75uaMSXeU1sdjO7MK1ipCwepxlWSVzlKegLpr48=
2222
20251205140756_pgflow_temp_cleanup_ensure_workers_logs.sql h1:wNsRFg6bZKwItg3qdNx/dP8zaK4Qe7ufOZwxO+yx/F8=
23+
20251205162618_pgflow_temp_setup_ensure_workers_cron.sql h1:Kq80xDBLQc60yYlrntZEFefQ4kfeiycMbauU7LKljCY=
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- Test: setup_ensure_workers_cron() accepts custom interval
2+
begin;
3+
select plan(2);
4+
5+
-- Execute function with custom interval (handles existing jobs internally)
6+
select pgflow.setup_ensure_workers_cron(cron_interval => '5 seconds');
7+
8+
-- Test: Job is created with custom interval
9+
select ok(
10+
exists(select 1 from cron.job where jobname = 'pgflow_ensure_workers'),
11+
'Should create pgflow_ensure_workers job'
12+
);
13+
14+
-- Test: Job has the custom schedule
15+
select is(
16+
(select schedule from cron.job where jobname = 'pgflow_ensure_workers'),
17+
'5 seconds',
18+
'Job should have 5 seconds schedule as specified'
19+
);
20+
21+
select finish();
22+
rollback;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
-- Test: setup_ensure_workers_cron() replaces existing job when called twice
2+
begin;
3+
select plan(3);
4+
5+
-- Setup first job with 1 second interval
6+
select pgflow.setup_ensure_workers_cron(cron_interval => '1 second');
7+
8+
-- Get job count after first call
9+
select is(
10+
(select count(*)::int from cron.job where jobname = 'pgflow_ensure_workers'),
11+
1,
12+
'Should have exactly one ensure_workers job after first call'
13+
);
14+
15+
-- Call again with different interval (should replace, not duplicate)
16+
select pgflow.setup_ensure_workers_cron(cron_interval => '2 seconds');
17+
18+
-- Test: Still only one job (no duplicates)
19+
select is(
20+
(select count(*)::int from cron.job where jobname = 'pgflow_ensure_workers'),
21+
1,
22+
'Should still have exactly one ensure_workers job after second call'
23+
);
24+
25+
-- Test: Job has updated schedule
26+
select is(
27+
(select schedule from cron.job where jobname = 'pgflow_ensure_workers'),
28+
'2 seconds',
29+
'Job should have updated schedule after second call'
30+
);
31+
32+
select finish();
33+
rollback;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- Test: setup_ensure_workers_cron() returns confirmation message
2+
begin;
3+
select plan(2);
4+
5+
-- Execute function and capture result (handles existing jobs internally)
6+
select pgflow.setup_ensure_workers_cron() as message into temporary result;
7+
8+
-- Test: Returns a non-null message
9+
select ok(
10+
(select message from result) is not null,
11+
'Should return a confirmation message'
12+
);
13+
14+
-- Test: Message mentions the scheduled jobs
15+
select ok(
16+
(select message from result) like '%pgflow_ensure_workers%',
17+
'Confirmation message should mention the scheduled job'
18+
);
19+
20+
drop table result;
21+
select finish();
22+
rollback;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-- Test: setup_ensure_workers_cron() schedules the cleanup job
2+
begin;
3+
select plan(3);
4+
5+
-- Execute function (should be idempotent - handles existing jobs internally)
6+
select pgflow.setup_ensure_workers_cron();
7+
8+
-- Test: Cleanup job is created
9+
select ok(
10+
exists(select 1 from cron.job where jobname = 'pgflow_cleanup_logs'),
11+
'Should create pgflow_cleanup_logs job'
12+
);
13+
14+
-- Test: Cleanup job runs hourly
15+
select is(
16+
(select schedule from cron.job where jobname = 'pgflow_cleanup_logs'),
17+
'0 * * * *',
18+
'Cleanup job should run hourly (every hour at minute 0)'
19+
);
20+
21+
-- Test: Cleanup job runs the correct command
22+
select ok(
23+
(select command from cron.job where jobname = 'pgflow_cleanup_logs') like '%pgflow.cleanup_ensure_workers_logs()%',
24+
'Cleanup job should call pgflow.cleanup_ensure_workers_logs()'
25+
);
26+
27+
select finish();
28+
rollback;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-- Test: setup_ensure_workers_cron() schedules the ensure_workers job
2+
begin;
3+
select plan(3);
4+
5+
-- Execute function (should be idempotent - handles existing jobs internally)
6+
select pgflow.setup_ensure_workers_cron();
7+
8+
-- Test: Job is created
9+
select ok(
10+
exists(select 1 from cron.job where jobname = 'pgflow_ensure_workers'),
11+
'Should create pgflow_ensure_workers job'
12+
);
13+
14+
-- Test: Job has correct schedule (1 second default)
15+
select is(
16+
(select schedule from cron.job where jobname = 'pgflow_ensure_workers'),
17+
'1 second',
18+
'Job should have 1 second schedule by default'
19+
);
20+
21+
-- Test: Job runs the correct command
22+
select ok(
23+
(select command from cron.job where jobname = 'pgflow_ensure_workers') like '%pgflow.ensure_workers()%',
24+
'Job should call pgflow.ensure_workers()'
25+
);
26+
27+
select finish();
28+
rollback;

0 commit comments

Comments
 (0)