Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkgs/core/schemas/0060_function_cleanup_ensure_workers_logs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Cleanup Ensure Workers Logs
-- Cleans up old cron job run details to prevent the table from growing indefinitely.
-- Note: net._http_response is automatically cleaned by pg_net (6 hour TTL), so we only clean cron logs.

drop function if exists pgflow.cleanup_ensure_workers_logs(integer);

create or replace function pgflow.cleanup_ensure_workers_logs(
retention_hours integer default 24
)
returns table (cron_deleted bigint)
language sql
security definer
set search_path = pgflow, cron, pg_temp
as $$
with deleted as (
delete from cron.job_run_details
where job_run_details.end_time < now() - (cleanup_ensure_workers_logs.retention_hours || ' hours')::interval
returning 1
)
select count(*)::bigint as cron_deleted from deleted
$$;
Comment on lines +7 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function lacks validation for the retention_hours parameter. Passing a negative value or zero could cause catastrophic data loss:

  • Negative value: retention_hours => -5 creates the condition end_time < now() + 5 hours, deleting records including those with recent or future end times
  • Zero: retention_hours => 0 creates end_time < now(), deleting ALL completed job records

Fix by adding validation at the start:

create or replace function pgflow.cleanup_ensure_workers_logs(
  retention_hours integer default 24
)
returns table (cron_deleted bigint)
language sql
security definer
set search_path = pgflow, cron, pg_temp
as $$
  with deleted as (
    delete from cron.job_run_details
    where job_run_details.end_time < now() - (cleanup_ensure_workers_logs.retention_hours || ' hours')::interval
      and cleanup_ensure_workers_logs.retention_hours > 0  -- Add validation
    returning 1
  )
  select count(*)::bigint as cron_deleted from deleted
$$;

Or raise an exception for invalid input before the DELETE.

Suggested change
create or replace function pgflow.cleanup_ensure_workers_logs(
retention_hours integer default 24
)
returns table (cron_deleted bigint)
language sql
security definer
set search_path = pgflow, cron, pg_temp
as $$
with deleted as (
delete from cron.job_run_details
where job_run_details.end_time < now() - (cleanup_ensure_workers_logs.retention_hours || ' hours')::interval
returning 1
)
select count(*)::bigint as cron_deleted from deleted
$$;
create or replace function pgflow.cleanup_ensure_workers_logs(
retention_hours integer default 24
)
returns table (cron_deleted bigint)
language sql
security definer
set search_path = pgflow, cron, pg_temp
as $$
with deleted as (
delete from cron.job_run_details
where job_run_details.end_time < now() - (cleanup_ensure_workers_logs.retention_hours || ' hours')::interval
and cleanup_ensure_workers_logs.retention_hours > 0
returning 1
)
select count(*)::bigint as cron_deleted from deleted
$$;

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


comment on function pgflow.cleanup_ensure_workers_logs(integer) is
'Cleans up old cron job run details to prevent table growth.
Default retention is 24 hours. HTTP response logs (net._http_response) are
automatically cleaned by pg_net with a 6-hour TTL, so they are not cleaned here.
This function follows the standard pg_cron maintenance pattern recommended by
AWS RDS, Neon, and Supabase documentation.';
6 changes: 6 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ export type Database = {
Args: { run_id: string }
Returns: number
}
cleanup_ensure_workers_logs: {
Args: { retention_hours?: number }
Returns: {
cron_deleted: number
}[]
}
complete_task: {
Args: {
output: Json
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Create "cleanup_ensure_workers_logs" function
CREATE FUNCTION "pgflow"."cleanup_ensure_workers_logs" ("retention_hours" integer DEFAULT 24) RETURNS TABLE ("cron_deleted" bigint) LANGUAGE sql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$
with deleted as (
delete from cron.job_run_details
where job_run_details.end_time < now() - (cleanup_ensure_workers_logs.retention_hours || ' hours')::interval
returning 1
)
select count(*)::bigint as cron_deleted from deleted
$$;
-- Set comment to function: "cleanup_ensure_workers_logs"
COMMENT ON FUNCTION "pgflow"."cleanup_ensure_workers_logs" IS 'Cleans up old cron job run details to prevent table growth.
Default retention is 24 hours. HTTP response logs (net._http_response) are
automatically cleaned by pg_net with a 6-hour TTL, so they are not cleaned here.
This function follows the standard pg_cron maintenance pattern recommended by
AWS RDS, Neon, and Supabase documentation.';
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:G7RvNGNjnwVtYZTuceBwoaHEXa9bd/xH6hC8x5MNxZk=
h1:UPs/YAMNWH5FdnHa1GJRQZ4yiv6kOHpfBNfs6telwgI=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -19,3 +19,4 @@ h1:G7RvNGNjnwVtYZTuceBwoaHEXa9bd/xH6hC8x5MNxZk=
20251204165231_pgflow_temp_mark_worker_stopped.sql h1:zI2FijK429oae4OpLbjU4eSaZtYhlsbvN3buWH9FKLw=
20251205103442_pgflow_temp_add_extensions.sql h1:IBHG1vBdXu8wDEJzqpJUFmuPhVaX0mAmDUkngLgdaMg=
20251205133446_pgflow_temp_ensure_workers.sql h1:EQzE75uaMSXeU1sdjO7MK1ipCwepxlWSVzlKegLpr48=
20251205140756_pgflow_temp_cleanup_ensure_workers_logs.sql h1:wNsRFg6bZKwItg3qdNx/dP8zaK4Qe7ufOZwxO+yx/F8=
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Test: cleanup_ensure_workers_logs() deletes old entries and returns correct count
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Clear existing cron job run details
delete from cron.job_run_details;

-- Setup: Create job run details at different ages (testing 4h custom retention)
insert into cron.job_run_details (runid, jobid, command, status, end_time)
values
(1, 1, 'select some_function()', 'succeeded', now() - interval '5 hours'), -- Should be deleted
(2, 1, 'select some_function()', 'succeeded', now() - interval '6 hours'), -- Should be deleted
(3, 1, 'select some_function()', 'succeeded', now() - interval '3 hours'), -- Should be kept
(4, 1, 'select some_function()', 'succeeded', now() - interval '1 hour'); -- Should be kept

-- Execute cleanup with 4 hour retention
with result as (
select * from pgflow.cleanup_ensure_workers_logs(retention_hours => 4)
)
select is(
(select cron_deleted from result),
2::bigint,
'Should return count of 2 deleted entries'
);

-- Test: Old entries were deleted
select is(
(select count(*) from cron.job_run_details where runid in (1, 2)),
0::bigint,
'Entries older than retention should be deleted'
);

-- Test: Recent entries were kept
select is(
(select count(*) from cron.job_run_details where runid in (3, 4)),
2::bigint,
'Entries newer than retention should be kept'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Test: cleanup_ensure_workers_logs() returns 0 when no old entries exist
begin;
select plan(1);
select pgflow_tests.reset_db();

-- Clear existing cron job run details
delete from cron.job_run_details;

-- Setup: Only recent entries exist
insert into cron.job_run_details (runid, jobid, command, status, end_time)
values
(1, 1, 'select some_function()', 'succeeded', now() - interval '1 hour'),
(2, 1, 'select some_function()', 'succeeded', now() - interval '2 hours');

-- Execute cleanup - nothing should be deleted
with result as (
select * from pgflow.cleanup_ensure_workers_logs()
)
select is(
(select cron_deleted from result),
0::bigint,
'Should return 0 when no entries exceed retention period'
);

select finish();
rollback;
Loading