Skip to content

Commit 389d7b8

Browse files
committed
add mark_worker_stopped() SQL function for graceful shutdown signaling
1 parent aedf84f commit 389d7b8

File tree

5 files changed

+63
-1
lines changed

5 files changed

+63
-1
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- Mark Worker Stopped
2+
-- Sets stopped_at timestamp on a worker row for graceful shutdown signaling
3+
4+
drop function if exists pgflow.mark_worker_stopped(uuid);
5+
6+
create or replace function pgflow.mark_worker_stopped(
7+
worker_id uuid
8+
) returns void
9+
language sql
10+
as $$
11+
update pgflow.workers
12+
set stopped_at = clock_timestamp()
13+
where workers.worker_id = mark_worker_stopped.worker_id;
14+
$$;
15+
16+
comment on function pgflow.mark_worker_stopped(uuid) is
17+
'Marks a worker as stopped for graceful shutdown. Called by workers on beforeunload.';

pkgs/core/src/database-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ export type Database = {
515515
get_run_with_states: { Args: { run_id: string }; Returns: Json }
516516
is_local: { Args: never; Returns: boolean }
517517
is_valid_slug: { Args: { slug: string }; Returns: boolean }
518+
mark_worker_stopped: { Args: { worker_id: string }; Returns: undefined }
518519
maybe_complete_run: { Args: { run_id: string }; Returns: undefined }
519520
poll_for_tasks: {
520521
Args: {
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Create "mark_worker_stopped" function
2+
CREATE FUNCTION "pgflow"."mark_worker_stopped" ("worker_id" uuid) RETURNS void LANGUAGE sql AS $$
3+
update pgflow.workers
4+
set stopped_at = clock_timestamp()
5+
where workers.worker_id = mark_worker_stopped.worker_id;
6+
$$;
7+
-- Set comment to function: "mark_worker_stopped"
8+
COMMENT ON FUNCTION "pgflow"."mark_worker_stopped" IS 'Marks a worker as stopped for graceful shutdown. Called by workers on beforeunload.';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:eMTY2CZPFm2BibVAHjtIMhCWIFcmTcqQq9VRXfs1/1A=
1+
h1:lFHz8CiL5ustrpb+hKVK2Ae5/jVpsOq1AX5vVgxj9Yo=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -20,3 +20,4 @@ h1:eMTY2CZPFm2BibVAHjtIMhCWIFcmTcqQq9VRXfs1/1A=
2020
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:VwqZiOcVaCahb6BZ918ioFLgwQcF/sy1TR9a4lSnVvs=
2121
20251204145037_pgflow_temp_worker_functions_schema.sql h1:5DJJEP0jcg7yapTe7t6FX2ypZIj92lGvJ12AX8g5fz4=
2222
20251204164612_pgflow_temp_track_worker_function.sql h1:SC8Z4Un37A2XVaIIvyZJ0eQ/7r0JENSi6RcBx4GaUCE=
23+
20251204165231_pgflow_temp_mark_worker_stopped.sql h1:5SQ353rHkcRMZ/EPTnm4+ZkC7NSQ4YhftU+Wyq00Cr0=
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
-- Test: mark_worker_stopped() sets stopped_at on the worker row
2+
begin;
3+
select plan(3);
4+
5+
-- Setup: Create a worker entry
6+
insert into pgflow.workers (worker_id, queue_name, function_name, started_at, last_heartbeat_at)
7+
values ('11111111-1111-1111-1111-111111111111', 'test_queue', 'test_function', now(), now());
8+
9+
-- Verify worker exists and stopped_at is NULL
10+
select is(
11+
(select stopped_at from pgflow.workers where worker_id = '11111111-1111-1111-1111-111111111111'),
12+
null,
13+
'Worker should have NULL stopped_at initially'
14+
);
15+
16+
-- Execute: Mark worker as stopped
17+
select pgflow.mark_worker_stopped('11111111-1111-1111-1111-111111111111'::uuid);
18+
19+
-- Test: stopped_at should be set
20+
select isnt(
21+
(select stopped_at from pgflow.workers where worker_id = '11111111-1111-1111-1111-111111111111'),
22+
null,
23+
'Worker should have stopped_at set after marking stopped'
24+
);
25+
26+
-- Test: stopped_at should be recent (within last second)
27+
select ok(
28+
(select stopped_at > clock_timestamp() - interval '1 second'
29+
from pgflow.workers
30+
where worker_id = '11111111-1111-1111-1111-111111111111'),
31+
'stopped_at should be recent (within last second)'
32+
);
33+
34+
select finish();
35+
rollback;

0 commit comments

Comments
 (0)