From 2a849291a97c998c7b458a600abb2d4cd3fda9e0 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Thu, 11 Dec 2025 20:53:03 +0200 Subject: [PATCH 1/6] mongodb: Add indexes for events Events are awfully slow even on ordinary request like: https://staging.kernelci.org:9000/latest/events?kind=job&limit=1000&from=2025-12-11T13:00:00 This index should help with that. Signed-off-by: Denys Fedoryshchenko --- api/pubsub_mongo.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/pubsub_mongo.py b/api/pubsub_mongo.py index 1b3da313..3a8df692 100644 --- a/api/pubsub_mongo.py +++ b/api/pubsub_mongo.py @@ -201,6 +201,11 @@ async def _ensure_indexes(self): [('channel', ASCENDING), ('sequence_id', ASCENDING)], name='channel_sequence_id' ) + # Compound index for filtered event queries (kind + timestamp) + await event_col.create_index( + [('data.kind', ASCENDING), ('timestamp', ASCENDING)], + name='kind_timestamp' + ) # Subscriber state indexes # Unique index on subscriber_id From b3b02ee402156bb831bed28e327fe1b91eb9bd67 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Thu, 11 Dec 2025 21:21:58 +0200 Subject: [PATCH 2/6] fixup: Fix failing mongodb indexes on startup Signed-off-by: Denys Fedoryshchenko --- api/pubsub_mongo.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/api/pubsub_mongo.py b/api/pubsub_mongo.py index 3a8df692..32c0ed4a 100644 --- a/api/pubsub_mongo.py +++ b/api/pubsub_mongo.py @@ -188,19 +188,10 @@ async def _ensure_indexes(self): sub_col = self._mongo_db[self.SUBSCRIBER_STATE_COLLECTION] # Event history indexes - # TTL index for auto-cleanup (7 days = 604800 seconds) - # Note: If index already exists with different TTL, this is a no-op. - # Migration handles dropping the old index first. - await event_col.create_index( - 'timestamp', - expireAfterSeconds=604800, - name='ttl_timestamp' - ) - # Compound index for efficient pub/sub catch-up queries - await event_col.create_index( - [('channel', ASCENDING), ('sequence_id', ASCENDING)], - name='channel_sequence_id' - ) + # Note: Standard indexes (TTL on timestamp, channel+sequence_id) are + # managed by the EventHistory model and created via db.create_indexes(). + # We only need to add the custom index for filtered event queries. + # Compound index for filtered event queries (kind + timestamp) await event_col.create_index( [('data.kind', ASCENDING), ('timestamp', ASCENDING)], From b29f970cb85e5ee5534ff6e15ed2fc567d6a6067 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Thu, 11 Dec 2025 23:06:23 +0200 Subject: [PATCH 3/6] events: Add filtering by id and ids Signed-off-by: Denys Fedoryshchenko --- api/main.py | 34 ++++++++++++++++++++++++++++++++++ doc/events-migration.md | 13 +++++++++++++ 2 files changed, 47 insertions(+) diff --git a/api/main.py b/api/main.py index 3b464a86..7f012450 100644 --- a/api/main.py +++ b/api/main.py @@ -435,6 +435,7 @@ async def get_events(request: Request): - kind: Event kind to filter events - state: Event state to filter events - result: Event result to filter events + - id / ids: Event document id(s) to filter events - recursive: Retrieve node together with event This API endpoint is under development and may change in future. """ @@ -446,6 +447,39 @@ async def get_events(request: Request): state = query_params.pop('state', None) result = query_params.pop('result', None) from_ts = query_params.pop('from', None) + # Support filtering by MongoDB _id. + # Accept `id=` for a single id or `ids=a,b,c` for multiple ids. + # Using `id` as query param is safe here because we remove it from the + # filter before passing to Mongo. + event_id = query_params.pop('id', None) + event_ids = query_params.pop('ids', None) + if event_id and event_ids: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Provide either id or ids, not both" + ) + if event_id: + try: + query_params['_id'] = ObjectId(event_id) + except (errors.InvalidId, TypeError) as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid id format" + ) from exc + elif event_ids: + try: + ids_list = [ObjectId(x.strip()) for x in event_ids.split(',') if x.strip()] + except (errors.InvalidId, TypeError) as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid ids format" + ) from exc + if not ids_list: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="ids must contain at least one id" + ) + query_params['_id'] = {'$in': ids_list} if from_ts: if isinstance(from_ts, str): from_ts = datetime.fromisoformat(from_ts) diff --git a/doc/events-migration.md b/doc/events-migration.md index 3218549f..8f469372 100644 --- a/doc/events-migration.md +++ b/doc/events-migration.md @@ -102,6 +102,19 @@ each event document but do not affect existing queries: Existing queries filtering on `timestamp`, `data.kind`, `data.state`, etc. continue to work unchanged. +Supported query parameters for `/events`: + +| Parameter | Type | Description | +|-----------|------|-------------| +| `from` | string (ISO timestamp) | Return events with `timestamp` greater than this value. | +| `kind` | string | Filter by `data.kind` (e.g., `job`, `node`). | +| `state` | string | Filter by `data.state`. | +| `result` | string | Filter by `data.result`. | +| `limit` | integer | Maximum number of events to return. | +| `recursive` | bool | Attach related node info to each event (max `limit` 1000). | +| `id` | string (Mongo ObjectId) | Filter by a single event document id. | +| `ids` | string (comma‑separated ObjectIds) | Filter by multiple event document ids. | + ## Subscriber ID Guidelines The `subscriber_id` should be: From 78a193f65f841b9d203e8c068ec4c00bad69eacc Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Thu, 11 Dec 2025 23:39:32 +0200 Subject: [PATCH 4/6] events: Add search by node_id Signed-off-by: Denys Fedoryshchenko --- api/main.py | 9 +++++++++ doc/events-migration.md | 6 +++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/api/main.py b/api/main.py index 7f012450..e52a2c4f 100644 --- a/api/main.py +++ b/api/main.py @@ -436,6 +436,7 @@ async def get_events(request: Request): - state: Event state to filter events - result: Event result to filter events - id / ids: Event document id(s) to filter events + - node_id: Node id to filter events (alias for data.id) - recursive: Retrieve node together with event This API endpoint is under development and may change in future. """ @@ -447,6 +448,14 @@ async def get_events(request: Request): state = query_params.pop('state', None) result = query_params.pop('result', None) from_ts = query_params.pop('from', None) + node_id = query_params.pop('node_id', None) + if node_id: + if 'data.id' in query_params: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Provide either node_id or data.id, not both" + ) + query_params['data.id'] = node_id # Support filtering by MongoDB _id. # Accept `id=` for a single id or `ids=a,b,c` for multiple ids. # Using `id` as query param is safe here because we remove it from the diff --git a/doc/events-migration.md b/doc/events-migration.md index 8f469372..478c0edb 100644 --- a/doc/events-migration.md +++ b/doc/events-migration.md @@ -112,8 +112,12 @@ Supported query parameters for `/events`: | `result` | string | Filter by `data.result`. | | `limit` | integer | Maximum number of events to return. | | `recursive` | bool | Attach related node info to each event (max `limit` 1000). | -| `id` | string (Mongo ObjectId) | Filter by a single event document id. | +| `id` | string (Mongo ObjectId) | Filter by a single event document id (top‑level `id` in `/events` response). | | `ids` | string (comma‑separated ObjectIds) | Filter by multiple event document ids. | +| `node_id` | string | Filter events by affected node id (alias for `data.id`). | + +Note: `id`/`ids` refer to the event history document id, not the node id. To get +events about a node, use `node_id` (or `data.id`). ## Subscriber ID Guidelines From ad95b7bcb1704f9b6685d5b53cce58c9a0554595 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 12 Dec 2025 09:54:48 +0200 Subject: [PATCH 5/6] pycodestyle: Fix linting errors Signed-off-by: Denys Fedoryshchenko --- api/main.py | 27 +++++++++++++-------------- api/pubsub_mongo.py | 5 +++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/api/main.py b/api/main.py index e52a2c4f..0127d9d7 100644 --- a/api/main.py +++ b/api/main.py @@ -477,7 +477,8 @@ async def get_events(request: Request): ) from exc elif event_ids: try: - ids_list = [ObjectId(x.strip()) for x in event_ids.split(',') if x.strip()] + ids_list = [ObjectId(x.strip()) + for x in event_ids.split(',') if x.strip()] except (errors.InvalidId, TypeError) as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, @@ -1152,19 +1153,17 @@ async def purge_handler(current_user: User = Depends(get_current_superuser), return await purge_old_nodes(age_days=days, batch_size=batch_size) -versioned_app = VersionedFastAPI( - app, - version_format='{major}', - prefix_format='/v{major}', - enable_latest=True, - default_version=(0, 0), - on_startup=[ - pubsub_startup, - create_indexes, - initialize_beanie, - start_background_tasks, - ] - ) +versioned_app = VersionedFastAPI(app, + version_format='{major}', + prefix_format='/v{major}', + enable_latest=True, + default_version=(0, 0), + on_startup=[ + pubsub_startup, + create_indexes, + initialize_beanie, + start_background_tasks, + ]) # traceback_exception_handler is a global exception handler that will be diff --git a/api/pubsub_mongo.py b/api/pubsub_mongo.py index 32c0ed4a..72e02e6e 100644 --- a/api/pubsub_mongo.py +++ b/api/pubsub_mongo.py @@ -92,7 +92,8 @@ def __init__(self, mongo_client=None, host=None, db_number=None, # In-memory subscription tracking (for fire-and-forget mode) # {sub_id: {'sub': Subscription, 'redis_sub': PubSub, # 'subscriber_id': str|None, ...}} - self._subscriptions: Dict[int, Dict[str, Any]] = {} + self._subscriptions: Dict[int, Dict[str, Any]] = \ + {} self._channels = set() self._lock = asyncio.Lock() self._keep_alive_timer = None @@ -189,7 +190,7 @@ async def _ensure_indexes(self): # Event history indexes # Note: Standard indexes (TTL on timestamp, channel+sequence_id) are - # managed by the EventHistory model and created via db.create_indexes(). + # managed by the EventHistory model and created via db.create_indexes() # We only need to add the custom index for filtered event queries. # Compound index for filtered event queries (kind + timestamp) From 1eda7de0407e9de38d69ae5bff7d4e6caa811307 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 12 Dec 2025 10:19:43 +0200 Subject: [PATCH 6/6] pylint: More fixes Signed-off-by: Denys Fedoryshchenko --- api/main.py | 102 ++++++++++++++++++++++++++------------------ api/pubsub_mongo.py | 6 +-- 2 files changed, 64 insertions(+), 44 deletions(-) diff --git a/api/main.py b/api/main.py index 0127d9d7..543ed652 100644 --- a/api/main.py +++ b/api/main.py @@ -422,46 +422,13 @@ async def update_password(request: Request, # No need for separate _get_eventhistory function -# TBD: Restrict response by Pydantic model -@app.get('/events') -async def get_events(request: Request): - """Get all the events if no request parameters have passed. - Format: [{event1}, {event2}, ...] or if recursive is set to true, - then we add to each event the node information. - Get all the matching events otherwise. - Query parameters can be used to filter the events: - - limit: Number of events to return - - from: Start timestamp (unix epoch) to filter events - - kind: Event kind to filter events - - state: Event state to filter events - - result: Event result to filter events - - id / ids: Event document id(s) to filter events - - node_id: Node id to filter events (alias for data.id) - - recursive: Retrieve node together with event - This API endpoint is under development and may change in future. +def _parse_event_id_filter(query_params: dict, event_id: str, + event_ids: str) -> None: + """Parse and validate event id/ids filter parameters. + + Modifies query_params in place to add _id filter. + Raises HTTPException on validation errors. """ - metrics.add('http_requests_total', 1) - query_params = dict(request.query_params) - recursive = query_params.pop('recursive', None) - limit = query_params.pop('limit', None) - kind = query_params.pop('kind', None) - state = query_params.pop('state', None) - result = query_params.pop('result', None) - from_ts = query_params.pop('from', None) - node_id = query_params.pop('node_id', None) - if node_id: - if 'data.id' in query_params: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Provide either node_id or data.id, not both" - ) - query_params['data.id'] = node_id - # Support filtering by MongoDB _id. - # Accept `id=` for a single id or `ids=a,b,c` for multiple ids. - # Using `id` as query param is safe here because we remove it from the - # filter before passing to Mongo. - event_id = query_params.pop('id', None) - event_ids = query_params.pop('ids', None) if event_id and event_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, @@ -490,6 +457,34 @@ async def get_events(request: Request): detail="ids must contain at least one id" ) query_params['_id'] = {'$in': ids_list} + + +def _build_events_query(query_params: dict) -> tuple: + """Extract and process event query parameters. + + Returns (recursive, processed_query_params). + Raises HTTPException on validation errors. + """ + recursive = query_params.pop('recursive', None) + limit = query_params.pop('limit', None) + kind = query_params.pop('kind', None) + state = query_params.pop('state', None) + result = query_params.pop('result', None) + from_ts = query_params.pop('from', None) + node_id = query_params.pop('node_id', None) + event_id = query_params.pop('id', None) + event_ids = query_params.pop('ids', None) + + if node_id: + if 'data.id' in query_params: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Provide either node_id or data.id, not both" + ) + query_params['data.id'] = node_id + + _parse_event_id_filter(query_params, event_id, event_ids) + if from_ts: if isinstance(from_ts, str): from_ts = datetime.fromisoformat(from_ts) @@ -502,13 +497,38 @@ async def get_events(request: Request): query_params['data.result'] = result if limit: query_params['limit'] = int(limit) - # limit recursive to 1000 + if recursive and (not limit or int(limit) > 1000): - # generate error raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Recursive limit is too large, max is 1000" ) + + return recursive, query_params + + +# TBD: Restrict response by Pydantic model +@app.get('/events') +async def get_events(request: Request): + """Get all the events if no request parameters have passed. + Format: [{event1}, {event2}, ...] or if recursive is set to true, + then we add to each event the node information. + Get all the matching events otherwise. + Query parameters can be used to filter the events: + - limit: Number of events to return + - from: Start timestamp (unix epoch) to filter events + - kind: Event kind to filter events + - state: Event state to filter events + - result: Event result to filter events + - id / ids: Event document id(s) to filter events + - node_id: Node id to filter events (alias for data.id) + - recursive: Retrieve node together with event + This API endpoint is under development and may change in future. + """ + metrics.add('http_requests_total', 1) + query_params = dict(request.query_params) + recursive, query_params = _build_events_query(query_params) + resp = await db.find_by_attributes_nonpaginated(EventHistory, query_params) resp_list = [] for item in resp: diff --git a/api/pubsub_mongo.py b/api/pubsub_mongo.py index 72e02e6e..af6ddcd3 100644 --- a/api/pubsub_mongo.py +++ b/api/pubsub_mongo.py @@ -312,7 +312,7 @@ def _eventhistory_to_cloudevent(self, event: Dict) -> str: return to_json(ce).decode('utf-8') # pylint: disable=too-many-arguments - async def _get_missed_events(self, channel: str, after_seq_id: int, + async def _get_missed_events(self, channel: str, after_seq_id: int, *, owner_filter: Optional[str] = None, promiscuous: bool = False, limit: int = None) -> List[Dict]: @@ -384,7 +384,7 @@ async def subscribe(self, channel: str, user: str, # If subscriber_id provided, set up durable subscription if subscriber_id: await self._setup_durable_subscription( - sub_id, subscriber_id, channel, user, promiscuous + sub_id, subscriber_id, channel, user, promiscuous=promiscuous ) return sub @@ -392,7 +392,7 @@ async def subscribe(self, channel: str, user: str, # pylint: disable=too-many-arguments async def _setup_durable_subscription( self, sub_id: int, subscriber_id: str, - channel: str, user: str, promiscuous: bool): + channel: str, user: str, *, promiscuous: bool): """Set up or restore durable subscription state""" col = self._mongo_db[self.SUBSCRIBER_STATE_COLLECTION] existing = await col.find_one({'subscriber_id': subscriber_id})