diff --git a/doc/ChangeLog.md b/doc/ChangeLog.md index b0563ba4d..bfc144e24 100644 --- a/doc/ChangeLog.md +++ b/doc/ChangeLog.md @@ -16,6 +16,8 @@ All notable changes to the project are documented in this file. production environments. See the documentation for usage examples - Add support for "routing interfaces", issue #647. Lists interfaces with IP forwarding. Inspect from CLI using `show interface`, look for `⇅` flag +- Add operational data journal to statd with hierarchical time-based retention + policy, keeping snapshots from every 5 minutes (recent) to yearly (historical) ### Fixes diff --git a/src/statd/Makefile.am b/src/statd/Makefile.am index 38d2be05a..9dd4fa4d7 100644 --- a/src/statd/Makefile.am +++ b/src/statd/Makefile.am @@ -2,10 +2,17 @@ DISTCLEANFILES = *~ *.d ACLOCAL_AMFLAGS = -I m4 sbin_PROGRAMS = statd -statd_SOURCES = statd.c shared.c shared.h +statd_SOURCES = statd.c shared.c shared.h journal.c journal_retention.c journal.h statd_CPPFLAGS = -D_DEFAULT_SOURCE -D_GNU_SOURCE statd_CFLAGS = -W -Wall -Wextra statd_CFLAGS += $(jansson_CFLAGS) $(libyang_CFLAGS) $(sysrepo_CFLAGS) statd_CFLAGS += $(libsrx_CFLAGS) $(libite_CFLAGS) statd_LDADD = $(jansson_LIBS) $(libyang_LIBS) $(sysrepo_LIBS) -statd_LDADD += $(libsrx_LIBS) $(libite_LIBS) +statd_LDADD += $(libsrx_LIBS) $(libite_LIBS) $(EV_LIBS) -lz + +# Test stub for journal retention policy (no dependencies, standalone) +noinst_PROGRAMS = journal_retention_stub +journal_retention_stub_SOURCES = journal_retention_stub.c journal_retention.c journal.h +journal_retention_stub_CPPFLAGS = -D_DEFAULT_SOURCE -D_GNU_SOURCE -DJOURNAL_RETENTION_STUB +journal_retention_stub_CFLAGS = -W -Wall -Wextra +journal_retention_stub_LDFLAGS = -static diff --git a/src/statd/configure.ac b/src/statd/configure.ac index 375fe8fc9..f01346a06 100644 --- a/src/statd/configure.ac +++ b/src/statd/configure.ac @@ -43,11 +43,14 @@ PKG_CHECK_MODULES([sysrepo], [sysrepo >= 2.2.36]) PKG_CHECK_MODULES([libsrx], [libsrx >= 1.0.0]) AC_CHECK_HEADER([ev.h], - [AC_CHECK_LIB([ev], [ev_loop_new], - [], - [AC_MSG_ERROR("libev not found")] )], + [saved_LIBS="$LIBS" + AC_CHECK_LIB([ev], [ev_loop_new], + [EV_LIBS="-lev"], + [AC_MSG_ERROR("libev not found")] ) + LIBS="$saved_LIBS"], [AC_MSG_ERROR("ev.h not found")] ) +AC_SUBST([EV_LIBS]) test "x$prefix" = xNONE && prefix=$ac_default_prefix test "x$exec_prefix" = xNONE && exec_prefix='${prefix}' diff --git a/src/statd/journal.c b/src/statd/journal.c new file mode 100644 index 000000000..a869e13d0 --- /dev/null +++ b/src/statd/journal.c @@ -0,0 +1,232 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "journal.h" + +#define JOURNAL_DIR "/var/lib/statd" +#define DUMP_FILE "/var/lib/statd/operational.json" +#define DUMP_INTERVAL 300.0 /* 5 minutes in seconds */ + +static void journal_stop_cb(struct ev_loop *loop, struct ev_async *, int) +{ + DEBUG("Journal thread stop signal received"); + ev_break(loop, EVBREAK_ALL); +} + +static void get_timestamp_filename(char *buf, size_t len, time_t ts) +{ + struct tm *tm = gmtime(&ts); + + snprintf(buf, len, "%04d%02d%02d-%02d%02d%02d.json.gz", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); +} + +/* Compress a file using gzip */ +static int gzip_file(const char *src, const char *dst) +{ + FILE *in; + gzFile gz; + char buf[4096]; + size_t n; + + in = fopen(src, "r"); + if (!in) { + ERROR("Error, opening %s: %s", src, strerror(errno)); + return -1; + } + + gz = gzopen(dst, "wb"); + if (!gz) { + ERROR("Error, opening %s: %s", dst, strerror(errno)); + fclose(in); + return -1; + } + + while ((n = fread(buf, 1, sizeof(buf), in)) > 0) { + if (gzwrite(gz, buf, n) != (int)n) { + ERROR("Error, writing to %s", dst); + gzclose(gz); + fclose(in); + unlink(dst); + return -1; + } + } + + gzclose(gz); + fclose(in); + return 0; +} + +/* Create timestamped snapshot and update operational.json */ +static int create_snapshot(const struct lyd_node *tree) +{ + char timestamp_file[300]; + char timestamp_path[512]; + time_t now; + int ret; + + /* Write latest snapshot as uncompressed operational.json for easy access */ + ret = lyd_print_path(DUMP_FILE, tree, LYD_JSON, LYD_PRINT_WITHSIBLINGS); + if (ret != LY_SUCCESS) { + ERROR("Error, writing operational.json: %d", ret); + return -1; + } + + /* Compress operational.json to timestamped archive */ + now = time(NULL); + get_timestamp_filename(timestamp_file, sizeof(timestamp_file), now); + snprintf(timestamp_path, sizeof(timestamp_path), "%s/%s", + JOURNAL_DIR, timestamp_file); + + if (gzip_file(DUMP_FILE, timestamp_path) != 0) { + ERROR("Error, compressing snapshot to %s", timestamp_file); + return -1; + } + + DEBUG("Created snapshot %s", timestamp_file); + return 0; +} + +static void journal_timer_cb(struct ev_loop *, struct ev_timer *w, int) +{ + struct journal_ctx *jctx = (struct journal_ctx *)w->data; + struct timespec start, end; + struct snapshot *snapshots = NULL; + sr_conn_ctx_t *con; + const struct ly_ctx *ctx; + sr_data_t *sr_data = NULL; + sr_error_t err; + int snapshot_count = 0; + long duration_ms; + + clock_gettime(CLOCK_MONOTONIC, &start); + DEBUG("Starting operational datastore dump"); + + con = sr_session_get_connection(jctx->sr_query_ses); + if (!con) { + ERROR("Error, getting sr connection for dump"); + return; + } + + ctx = sr_acquire_context(con); + if (!ctx) { + ERROR("Error, acquiring context for dump"); + return; + } + + /* Query ALL operational data via second session + * This triggers our own operational callbacks running in main thread + */ + DEBUG("Calling sr_get_data on session %p", jctx->sr_query_ses); + err = sr_get_data(jctx->sr_query_ses, "/*", 0, 0, 0, &sr_data); + if (err != SR_ERR_OK) { + ERROR("Error, getting operational data: %s", sr_strerror(err)); + sr_release_context(con); + return; + } + DEBUG("sr_get_data succeeded, got data tree: %p", sr_data ? sr_data->tree : NULL); + + /* Create timestamped snapshot */ + if (sr_data && sr_data->tree) { + if (create_snapshot(sr_data->tree) != 0) { + sr_release_data(sr_data); + sr_release_context(con); + return; + } + } else { + DEBUG("No operational data to dump"); + } + + sr_release_data(sr_data); + sr_release_context(con); + + /* Apply retention policy */ + if (journal_scan_snapshots(JOURNAL_DIR, &snapshots, &snapshot_count) == 0) { + DEBUG("Applying retention policy to %d snapshots", snapshot_count); + journal_apply_retention_policy(JOURNAL_DIR, snapshots, snapshot_count, time(NULL)); + free(snapshots); + } + + clock_gettime(CLOCK_MONOTONIC, &end); + duration_ms = (end.tv_sec - start.tv_sec) * 1000 + + (end.tv_nsec - start.tv_nsec) / 1000000; + + INFO("Journal snapshot created and retention applied (took %ld ms)", duration_ms); +} + +static void *journal_thread_fn(void *arg) +{ + struct journal_ctx *jctx = (struct journal_ctx *)arg; + struct ev_timer journal_timer; + + INFO("Journal thread started"); + + if (mkdir("/var/lib/statd", 0755) != 0 && errno != EEXIST) { + ERROR("Error, creating directory /var/lib/statd: %s", strerror(errno)); + } + + jctx->journal_loop = ev_loop_new(EVFLAG_AUTO); + if (!jctx->journal_loop) { + ERROR("Error, creating journal thread event loop"); + return NULL; + } + + /* Setup async watcher for stop signal */ + ev_async_init(&jctx->journal_stop, journal_stop_cb); + ev_async_start(jctx->journal_loop, &jctx->journal_stop); + + /* Setup timer for periodic dumps */ + ev_timer_init(&journal_timer, journal_timer_cb, DUMP_INTERVAL, DUMP_INTERVAL); + journal_timer.data = jctx; + ev_timer_start(jctx->journal_loop, &journal_timer); + + DEBUG("Journal thread entering event loop"); + ev_run(jctx->journal_loop, 0); + + ev_timer_stop(jctx->journal_loop, &journal_timer); + ev_async_stop(jctx->journal_loop, &jctx->journal_stop); + ev_loop_destroy(jctx->journal_loop); + + INFO("Journal thread exiting"); + return NULL; +} + +int journal_start(struct journal_ctx *jctx, sr_session_ctx_t *sr_query_ses) +{ + int err; + + jctx->sr_query_ses = sr_query_ses; + jctx->journal_thread_running = 1; + + err = pthread_create(&jctx->journal_thread, NULL, journal_thread_fn, jctx); + if (err) { + ERROR("Error, creating journal thread: %s", strerror(err)); + return err; + } + + INFO("Periodic operational dump enabled (every %.0f seconds)", DUMP_INTERVAL); + return 0; +} + +void journal_stop(struct journal_ctx *jctx) +{ + /* Signal thread to exit immediately via async watcher */ + jctx->journal_thread_running = 0; + ev_async_send(jctx->journal_loop, &jctx->journal_stop); + pthread_join(jctx->journal_thread, NULL); +} diff --git a/src/statd/journal.h b/src/statd/journal.h new file mode 100644 index 000000000..dc5784fa7 --- /dev/null +++ b/src/statd/journal.h @@ -0,0 +1,31 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ + +#ifndef STATD_JOURNAL_H_ +#define STATD_JOURNAL_H_ + +#include +#include +#include +#include + +/* Snapshot structure for tracking journal files */ +struct snapshot { + char filename[256]; + time_t timestamp; +}; + +struct journal_ctx { + sr_session_ctx_t *sr_query_ses; /* Consumer session for queries */ + struct ev_loop *journal_loop; /* Event loop for journal thread */ + pthread_t journal_thread; /* Thread for periodic dumps */ + struct ev_async journal_stop; /* Signal to stop journal thread */ + volatile int journal_thread_running; /* Flag to stop journal thread */ +}; + +int journal_start(struct journal_ctx *jctx, sr_session_ctx_t *sr_query_ses); +void journal_stop(struct journal_ctx *jctx); + +int journal_scan_snapshots(const char *dir, struct snapshot **snapshots, int *count); +void journal_apply_retention_policy(const char *dir, struct snapshot *snapshots, int count, time_t now); + +#endif diff --git a/src/statd/journal_retention.c b/src/statd/journal_retention.c new file mode 100644 index 000000000..313bf9cf0 --- /dev/null +++ b/src/statd/journal_retention.c @@ -0,0 +1,291 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ + +#include +#include +#include +#include +#include +#include +#include + +#ifndef JOURNAL_RETENTION_STUB +#include +#else +/* Simple logging for test stub without srx dependency */ +#define ERROR(fmt, ...) fprintf(stderr, "ERROR: " fmt "\n", ##__VA_ARGS__) +#define DEBUG(fmt, ...) do { } while (0) +#endif + +#include "journal.h" + +/* Retention policy age thresholds */ +#define AGE_1_HOUR (60 * 60) +#define AGE_1_DAY (24 * AGE_1_HOUR) +#define AGE_1_WEEK (7 * AGE_1_DAY) +#define AGE_1_MONTH (30 * AGE_1_DAY) +#define AGE_1_YEAR (365 * AGE_1_DAY) + +static int parse_timestamp_filename(const char *filename, time_t *ts) +{ + struct tm tm = {0}; + int year, mon, day, hour, min, sec; + + if (sscanf(filename, "%4d%2d%2d-%2d%2d%2d.json.gz", + &year, &mon, &day, &hour, &min, &sec) != 6) + return -1; + + tm.tm_year = year - 1900; + tm.tm_mon = mon - 1; + tm.tm_mday = day; + tm.tm_hour = hour; + tm.tm_min = min; + tm.tm_sec = sec; + + *ts = timegm(&tm); + return 0; +} + +/* Comparison function for qsort */ +static int snapshot_compare(const void *a, const void *b) +{ + const struct snapshot *sa = (const struct snapshot *)a; + const struct snapshot *sb = (const struct snapshot *)b; + + if (sa->timestamp < sb->timestamp) + return -1; + if (sa->timestamp > sb->timestamp) + return 1; + return 0; +} + +int journal_scan_snapshots(const char *dir, struct snapshot **out_snapshots, int *out_count) +{ + DIR *d = opendir(dir); + struct dirent *entry; + struct snapshot *snapshots = NULL; + int count = 0; + int capacity = 0; + + if (!d) { + ERROR("Failed to open directory %s: %s", dir, strerror(errno)); + return -1; + } + + while ((entry = readdir(d)) != NULL) { + time_t ts; + + if (strcmp(entry->d_name, "operational.json") == 0) + continue; + if (!strstr(entry->d_name, ".json.gz")) + continue; + + if (parse_timestamp_filename(entry->d_name, &ts) != 0) + continue; + + if (count >= capacity) { + struct snapshot *new_snapshots; + + capacity = capacity ? capacity * 2 : 32; + new_snapshots = realloc(snapshots, capacity * sizeof(struct snapshot)); + if (!new_snapshots) { + ERROR("Failed to allocate memory for snapshots"); + free(snapshots); + closedir(d); + return -1; + } + snapshots = new_snapshots; + } + + /* Add snapshot */ + snprintf(snapshots[count].filename, sizeof(snapshots[count].filename), + "%s", entry->d_name); + snapshots[count].timestamp = ts; + count++; + } + + closedir(d); + + /* Sort by timestamp (oldest first) */ + if (count > 0) + qsort(snapshots, count, sizeof(struct snapshot), snapshot_compare); + + *out_snapshots = snapshots; + *out_count = count; + return 0; +} + +static void delete_snapshot(const char *dir, const char *filename) +{ + char path[512]; + + snprintf(path, sizeof(path), "%s/%s", dir, filename); + if (unlink(path) != 0) + ERROR("Failed to delete snapshot %s: %s", filename, strerror(errno)); + else + DEBUG("Deleted snapshot %s", filename); +} + +/* Check if this is the first snapshot in the given period by checking if there's + * an earlier snapshot in the same period */ +static int is_first_in_period(struct snapshot *snapshots, int count, int idx, + int (*same_period)(time_t a, time_t b)) +{ + int i; + + for (i = 0; i < idx; i++) { + if (same_period(snapshots[i].timestamp, snapshots[idx].timestamp)) + return 0; /* Found earlier snapshot in same period */ + } + + return 1; /* This is the first */ +} + +/* Check if two timestamps are in the same hour */ +static int same_hour(time_t a, time_t b) +{ + struct tm tm_a, tm_b; + + gmtime_r(&a, &tm_a); + gmtime_r(&b, &tm_b); + + return tm_a.tm_year == tm_b.tm_year && + tm_a.tm_yday == tm_b.tm_yday && + tm_a.tm_hour == tm_b.tm_hour; +} + +/* Check if two timestamps are in the same day */ +static int same_day(time_t a, time_t b) +{ + struct tm tm_a, tm_b; + + gmtime_r(&a, &tm_a); + gmtime_r(&b, &tm_b); + + return tm_a.tm_year == tm_b.tm_year && + tm_a.tm_yday == tm_b.tm_yday; +} + +/* Check if two timestamps are in the same week (Sunday-based) */ +static int same_week(time_t a, time_t b) +{ + struct tm tm_a, tm_b; + time_t sunday_a, sunday_b; + + gmtime_r(&a, &tm_a); + gmtime_r(&b, &tm_b); + + /* Calculate Sunday midnight for each timestamp */ + tm_a.tm_mday -= tm_a.tm_wday; + tm_a.tm_hour = 0; + tm_a.tm_min = 0; + tm_a.tm_sec = 0; + sunday_a = timegm(&tm_a); + + tm_b.tm_mday -= tm_b.tm_wday; + tm_b.tm_hour = 0; + tm_b.tm_min = 0; + tm_b.tm_sec = 0; + sunday_b = timegm(&tm_b); + + return sunday_a == sunday_b; +} + +/* Check if two timestamps are in the same month */ +static int same_month(time_t a, time_t b) +{ + struct tm tm_a, tm_b; + + gmtime_r(&a, &tm_a); + gmtime_r(&b, &tm_b); + + return tm_a.tm_year == tm_b.tm_year && + tm_a.tm_mon == tm_b.tm_mon; +} + +/* Check if two timestamps are in the same year */ +static int same_year(time_t a, time_t b) +{ + struct tm tm_a, tm_b; + + gmtime_r(&a, &tm_a); + gmtime_r(&b, &tm_b); + + return tm_a.tm_year == tm_b.tm_year; +} + +/* Keep all snapshots in the 5-minute bucket */ +static int should_keep_5min(struct snapshot *snapshots, int count, int idx) +{ + (void)snapshots; + (void)count; + (void)idx; + return 1; /* Keep all */ +} + +static int should_keep_hourly(struct snapshot *snapshots, int count, int idx) +{ + return is_first_in_period(snapshots, count, idx, same_hour); +} + +static int should_keep_daily(struct snapshot *snapshots, int count, int idx) +{ + return is_first_in_period(snapshots, count, idx, same_day); +} + +static int should_keep_weekly(struct snapshot *snapshots, int count, int idx) +{ + return is_first_in_period(snapshots, count, idx, same_week); +} + +static int should_keep_monthly(struct snapshot *snapshots, int count, int idx) +{ + return is_first_in_period(snapshots, count, idx, same_month); +} + +static int should_keep_yearly(struct snapshot *snapshots, int count, int idx) +{ + return is_first_in_period(snapshots, count, idx, same_year); +} + +/* Apply retention policy to snapshots */ +void journal_apply_retention_policy(const char *dir, struct snapshot *snapshots, int count, time_t now) +{ + int *keep; + int i; + + if (count == 0) + return; + + /* Mark snapshots to keep (1) or delete (0) */ + keep = calloc(count, sizeof(int)); + if (!keep) { + ERROR("Failed to allocate memory for retention policy"); + return; + } + + for (i = 0; i < count; i++) { + time_t age = now - snapshots[i].timestamp; + + if (age <= AGE_1_HOUR) { + keep[i] = should_keep_5min(snapshots, count, i); + } else if (age <= AGE_1_DAY) { + keep[i] = should_keep_hourly(snapshots, count, i); + } else if (age <= AGE_1_WEEK) { + keep[i] = should_keep_daily(snapshots, count, i); + } else if (age <= AGE_1_MONTH) { + keep[i] = should_keep_weekly(snapshots, count, i); + } else if (age <= AGE_1_YEAR) { + keep[i] = should_keep_monthly(snapshots, count, i); + } else { + keep[i] = should_keep_yearly(snapshots, count, i); + } + } + + /* Delete snapshots not marked for keeping */ + for (i = 0; i < count; i++) { + if (!keep[i]) + delete_snapshot(dir, snapshots[i].filename); + } + + free(keep); +} diff --git a/src/statd/journal_retention_stub.c b/src/statd/journal_retention_stub.c new file mode 100644 index 000000000..9bcde53db --- /dev/null +++ b/src/statd/journal_retention_stub.c @@ -0,0 +1,44 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ + +/* + * Test stub for journal retention policy + * + * This program applies the retention policy to a directory of timestamped + * JSON snapshots. It's used by the Python unit tests. + * + * Usage: journal_retention_stub + * directory: Path to directory containing timestamped JSON snapshots + * now: Unix timestamp (seconds since epoch) representing "current time" + * + * Example: + * journal_retention_stub /tmp/testdir 1704067200 + */ + +#include +#include + +#include "journal.h" + +int main(int argc, char *argv[]) +{ + struct snapshot *snapshots; + int snapshot_count; + time_t now; + + if (argc != 3) { + fprintf(stderr, "Usage: %s \n", argv[0]); + fprintf(stderr, " directory: Path to snapshots directory\n"); + fprintf(stderr, " unix_timestamp: Current time as seconds since epoch\n"); + return 1; + } + + now = (time_t)atol(argv[2]); + + if (journal_scan_snapshots(argv[1], &snapshots, &snapshot_count) != 0) + return 1; + + journal_apply_retention_policy(argv[1], snapshots, snapshot_count, now); + + free(snapshots); + return 0; +} diff --git a/src/statd/statd.c b/src/statd/statd.c index 816670d66..bdd0024ec 100644 --- a/src/statd/statd.c +++ b/src/statd/statd.c @@ -7,6 +7,9 @@ #include #include #include +#include +#include +#include #include #include @@ -26,6 +29,7 @@ #include #include "shared.h" +#include "journal.h" /* New kernel feature, not in sys/mman.h yet */ #ifndef MFD_NOEXEC_SEAL @@ -59,8 +63,11 @@ struct sub { struct statd { struct sub_head subs; - sr_session_ctx_t *sr_ses; + sr_session_ctx_t *sr_ses; /* Provider session with callbacks */ + sr_session_ctx_t *sr_query_ses; /* Consumer session for queries */ + sr_conn_ctx_t *sr_conn; /* Connection (owns YANG context) */ struct ev_loop *ev_loop; + struct journal_ctx journal; /* Journal thread context */ }; static int ly_add_yanger_data(const struct ly_ctx *ctx, struct lyd_node **parent, @@ -447,7 +454,6 @@ int main(int argc, char *argv[]) struct ev_signal sigint_watcher, sigusr1_watcher; int log_opts = LOG_PID | LOG_NDELAY; struct statd statd = {}; - sr_conn_ctx_t *sr_conn; const char *env; int err; @@ -464,24 +470,37 @@ int main(int argc, char *argv[]) INFO("Status daemon starting"); - err = sr_connect(SR_CONN_DEFAULT, &sr_conn); + err = sr_connect(SR_CONN_DEFAULT, &statd.sr_conn); if (err) { ERROR("Error, connecting to sysrepo: %s", sr_strerror(err)); return EXIT_FAILURE; } DEBUG("Connected to sysrepo"); - err = sr_session_start(sr_conn, SR_DS_OPERATIONAL, &statd.sr_ses); + /* Session 1: Provider with operational callbacks */ + err = sr_session_start(statd.sr_conn, SR_DS_OPERATIONAL, &statd.sr_ses); if (err) { - ERROR("Error, start sysrepo session: %s", sr_strerror(err)); - sr_disconnect(sr_conn); + ERROR("Error, start provider session: %s", sr_strerror(err)); + sr_disconnect(statd.sr_conn); return EXIT_FAILURE; } - DEBUG("Session started (%p)", statd.sr_ses); + DEBUG("Provider session started (%p)", statd.sr_ses); + + /* Session 2: Consumer for querying operational data */ + err = sr_session_start(statd.sr_conn, SR_DS_OPERATIONAL, &statd.sr_query_ses); + if (err) { + ERROR("Error, start query session: %s", sr_strerror(err)); + sr_session_stop(statd.sr_ses); + sr_disconnect(statd.sr_conn); + return EXIT_FAILURE; + } + DEBUG("Query session started (%p)", statd.sr_query_ses); err = subscribe_to_all(&statd); if (err) { - sr_disconnect(sr_conn); + sr_session_stop(statd.sr_query_ses); + sr_session_stop(statd.sr_ses); + sr_disconnect(statd.sr_conn); return EXIT_FAILURE; } @@ -493,6 +512,14 @@ int main(int argc, char *argv[]) sigusr1_watcher.data = &statd; ev_signal_start(statd.ev_loop, &sigusr1_watcher); + err = journal_start(&statd.journal, statd.sr_query_ses); + if (err) { + sr_session_stop(statd.sr_query_ses); + sr_session_stop(statd.sr_ses); + sr_disconnect(statd.sr_conn); + return EXIT_FAILURE; + } + /* Signal readiness to Finit */ pidfile(NULL); @@ -501,9 +528,13 @@ int main(int argc, char *argv[]) /* We should never get here during normal operation */ INFO("Status daemon shutting down"); + + journal_stop(&statd.journal); + unsub_to_all(&statd); + sr_session_stop(statd.sr_query_ses); sr_session_stop(statd.sr_ses); - sr_disconnect(sr_conn); + sr_disconnect(statd.sr_conn); return EXIT_SUCCESS; } diff --git a/test/case/statd/all.yaml b/test/case/statd/all.yaml index b7993e159..ee1d006bb 100644 --- a/test/case/statd/all.yaml +++ b/test/case/statd/all.yaml @@ -5,5 +5,7 @@ name: "containers" - case: interfaces-all/test name: "interfaces-all" +- case: journal-retention/test.py + name: "journal-retention" - case: system/test name: "system" diff --git a/test/case/statd/journal-retention/test.py b/test/case/statd/journal-retention/test.py new file mode 100755 index 000000000..8888bc450 --- /dev/null +++ b/test/case/statd/journal-retention/test.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Test journal retention policy + +This test simulates creating snapshots over a 13-month period and verifies +that the retention policy keeps the first snapshot of each period: +- Every 5 minutes for the last hour (13 snapshots) +- First snapshot of each hour for the last day (23 snapshots) +- First snapshot of each day for the last week (6 snapshots) +- First snapshot of each week for the last month (3 snapshots) +- First snapshot of each month for the last year (11 snapshots) +- First snapshot of each year forever (1 snapshot per year) +""" + +import os +import sys +import tempfile +import subprocess +from datetime import datetime + +from infamy.tap import Test + +def create_snapshot(test_dir, timestamp): + """Create an empty snapshot file with the given timestamp""" + dt = datetime.utcfromtimestamp(timestamp) + filename = dt.strftime("%Y%m%d-%H%M%S.json.gz") + path = os.path.join(test_dir, filename) + open(path, 'w').close() + return filename + +def count_snapshots(test_dir): + """Count compressed snapshot files in directory""" + return len([f for f in os.listdir(test_dir) if f.endswith('.json.gz')]) + +def count_snapshots_by_age(test_dir, now): + """Count snapshots by age bucket""" + AGE_1_HOUR = 60 * 60 + AGE_1_DAY = 24 * AGE_1_HOUR + AGE_1_WEEK = 7 * AGE_1_DAY + AGE_1_MONTH = 30 * AGE_1_DAY + AGE_1_YEAR = 365 * AGE_1_DAY + + counts = { + 'hour': 0, + 'day': 0, + 'week': 0, + 'month': 0, + 'year': 0, + 'older': 0 + } + + for filename in os.listdir(test_dir): + if not filename.endswith('.json.gz'): + continue + + # Parse timestamp from filename (YYYYMMDD-HHMMSS.json.gz) + try: + ts_str = filename.replace('.json.gz', '') + dt = datetime.strptime(ts_str, "%Y%m%d-%H%M%S") + ts = int(dt.timestamp()) + age = now - ts + + if age <= AGE_1_HOUR: + counts['hour'] += 1 + elif age <= AGE_1_DAY: + counts['day'] += 1 + elif age <= AGE_1_WEEK: + counts['week'] += 1 + elif age <= AGE_1_MONTH: + counts['month'] += 1 + elif age <= AGE_1_YEAR: + counts['year'] += 1 + else: + counts['older'] += 1 + except ValueError: + pass + + return counts + +def run_retention_stub(stub_path, test_dir, now): + """Run the retention policy stub""" + result = subprocess.run( + [stub_path, test_dir, str(now)], + capture_output=True, + text=True + ) + if result.returncode != 0: + print(f"Stub failed with exit code {result.returncode}") + print(f"Stderr: {result.stderr}") + print(f"Stdout: {result.stdout}") + raise Exception(f"Retention stub failed with exit code {result.returncode}") + +with Test() as test: + with test.step("Find journal_retention_stub binary"): + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.abspath(os.path.join(script_dir, "../../../..")) + stub_path = os.path.join(repo_root, "output/build/statd-1.0/journal_retention_stub") + + if not os.path.exists(stub_path): + stub_path = os.path.join(repo_root, "src/statd/journal_retention_stub") + if not os.path.exists(stub_path): + test.skip() + + print(f"Using stub binary: {stub_path}") + + with tempfile.TemporaryDirectory() as test_dir: + print(f"Using test directory: {test_dir}") + + with test.step("Create 13 months of snapshots every 5 minutes"): + base_time = datetime(2024, 1, 1, 0, 0, 0) + start_time = int(base_time.timestamp()) + interval_5min = 5 * 60 + interval_1day = 24 * 3600 + total_duration = 13 * 30 * 24 * 3600 + + t = start_time + last_retention_run = start_time + while t < start_time + total_duration: + create_snapshot(test_dir, t) + simulated_now = t + + # Apply retention policy once per simulated day + if t - last_retention_run >= interval_1day: + run_retention_stub(stub_path, test_dir, simulated_now) + last_retention_run = t + + t += interval_5min + + # Final retention run + run_retention_stub(stub_path, test_dir, simulated_now) + print(f"Final simulated time: {datetime.utcfromtimestamp(simulated_now)}") + + # Count remaining snapshots + counts = count_snapshots_by_age(test_dir, simulated_now) + print(f"Snapshot counts: hour={counts['hour']}, day={counts['day']}, " + f"week={counts['week']}, month={counts['month']}, " + f"year={counts['year']}, older={counts['older']}") + + with test.step("Verify last hour retention (13 snapshots)"): + assert counts['hour'] == 13, f"Expected 13, got {counts['hour']}" + + with test.step("Verify last day retention (23 snapshots)"): + assert counts['day'] == 23, f"Expected 23, got {counts['day']}" + + with test.step("Verify last week retention (6 snapshots)"): + assert counts['week'] == 6, f"Expected 6, got {counts['week']}" + + with test.step("Verify last month retention (3 snapshots)"): + assert counts['month'] == 3, f"Expected 3, got {counts['month']}" + + with test.step("Verify last year retention (11 snapshots)"): + assert counts['year'] == 11, f"Expected 11, got {counts['year']}" + + with test.step("Verify older than 1 year retention (1 snapshot)"): + assert counts['older'] == 1, f"Expected 1, got {counts['older']}" + + test.succeed()