diff --git a/api/main.py b/api/main.py index 3b464a86..543ed652 100644 --- a/api/main.py +++ b/api/main.py @@ -422,30 +422,69 @@ async def update_password(request: Request, # No need for separate _get_eventhistory function -# TBD: Restrict response by Pydantic model -@app.get('/events') -async def get_events(request: Request): - """Get all the events if no request parameters have passed. - Format: [{event1}, {event2}, ...] or if recursive is set to true, - then we add to each event the node information. - Get all the matching events otherwise. - Query parameters can be used to filter the events: - - limit: Number of events to return - - from: Start timestamp (unix epoch) to filter events - - kind: Event kind to filter events - - state: Event state to filter events - - result: Event result to filter events - - recursive: Retrieve node together with event - This API endpoint is under development and may change in future. +def _parse_event_id_filter(query_params: dict, event_id: str, + event_ids: str) -> None: + """Parse and validate event id/ids filter parameters. + + Modifies query_params in place to add _id filter. + Raises HTTPException on validation errors. + """ + if event_id and event_ids: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Provide either id or ids, not both" + ) + if event_id: + try: + query_params['_id'] = ObjectId(event_id) + except (errors.InvalidId, TypeError) as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid id format" + ) from exc + elif event_ids: + try: + ids_list = [ObjectId(x.strip()) + for x in event_ids.split(',') if x.strip()] + except (errors.InvalidId, TypeError) as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid ids format" + ) from exc + if not ids_list: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="ids must contain at least one id" + ) + query_params['_id'] = {'$in': ids_list} + + +def _build_events_query(query_params: dict) -> tuple: + """Extract and process event query parameters. + + Returns (recursive, processed_query_params). + Raises HTTPException on validation errors. """ - metrics.add('http_requests_total', 1) - query_params = dict(request.query_params) recursive = query_params.pop('recursive', None) limit = query_params.pop('limit', None) kind = query_params.pop('kind', None) state = query_params.pop('state', None) result = query_params.pop('result', None) from_ts = query_params.pop('from', None) + node_id = query_params.pop('node_id', None) + event_id = query_params.pop('id', None) + event_ids = query_params.pop('ids', None) + + if node_id: + if 'data.id' in query_params: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Provide either node_id or data.id, not both" + ) + query_params['data.id'] = node_id + + _parse_event_id_filter(query_params, event_id, event_ids) + if from_ts: if isinstance(from_ts, str): from_ts = datetime.fromisoformat(from_ts) @@ -458,13 +497,38 @@ async def get_events(request: Request): query_params['data.result'] = result if limit: query_params['limit'] = int(limit) - # limit recursive to 1000 + if recursive and (not limit or int(limit) > 1000): - # generate error raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Recursive limit is too large, max is 1000" ) + + return recursive, query_params + + +# TBD: Restrict response by Pydantic model +@app.get('/events') +async def get_events(request: Request): + """Get all the events if no request parameters have passed. + Format: [{event1}, {event2}, ...] or if recursive is set to true, + then we add to each event the node information. + Get all the matching events otherwise. + Query parameters can be used to filter the events: + - limit: Number of events to return + - from: Start timestamp (unix epoch) to filter events + - kind: Event kind to filter events + - state: Event state to filter events + - result: Event result to filter events + - id / ids: Event document id(s) to filter events + - node_id: Node id to filter events (alias for data.id) + - recursive: Retrieve node together with event + This API endpoint is under development and may change in future. + """ + metrics.add('http_requests_total', 1) + query_params = dict(request.query_params) + recursive, query_params = _build_events_query(query_params) + resp = await db.find_by_attributes_nonpaginated(EventHistory, query_params) resp_list = [] for item in resp: @@ -1109,19 +1173,17 @@ async def purge_handler(current_user: User = Depends(get_current_superuser), return await purge_old_nodes(age_days=days, batch_size=batch_size) -versioned_app = VersionedFastAPI( - app, - version_format='{major}', - prefix_format='/v{major}', - enable_latest=True, - default_version=(0, 0), - on_startup=[ - pubsub_startup, - create_indexes, - initialize_beanie, - start_background_tasks, - ] - ) +versioned_app = VersionedFastAPI(app, + version_format='{major}', + prefix_format='/v{major}', + enable_latest=True, + default_version=(0, 0), + on_startup=[ + pubsub_startup, + create_indexes, + initialize_beanie, + start_background_tasks, + ]) # traceback_exception_handler is a global exception handler that will be diff --git a/api/pubsub_mongo.py b/api/pubsub_mongo.py index 1b3da313..af6ddcd3 100644 --- a/api/pubsub_mongo.py +++ b/api/pubsub_mongo.py @@ -92,7 +92,8 @@ def __init__(self, mongo_client=None, host=None, db_number=None, # In-memory subscription tracking (for fire-and-forget mode) # {sub_id: {'sub': Subscription, 'redis_sub': PubSub, # 'subscriber_id': str|None, ...}} - self._subscriptions: Dict[int, Dict[str, Any]] = {} + self._subscriptions: Dict[int, Dict[str, Any]] = \ + {} self._channels = set() self._lock = asyncio.Lock() self._keep_alive_timer = None @@ -188,18 +189,14 @@ async def _ensure_indexes(self): sub_col = self._mongo_db[self.SUBSCRIBER_STATE_COLLECTION] # Event history indexes - # TTL index for auto-cleanup (7 days = 604800 seconds) - # Note: If index already exists with different TTL, this is a no-op. - # Migration handles dropping the old index first. - await event_col.create_index( - 'timestamp', - expireAfterSeconds=604800, - name='ttl_timestamp' - ) - # Compound index for efficient pub/sub catch-up queries + # Note: Standard indexes (TTL on timestamp, channel+sequence_id) are + # managed by the EventHistory model and created via db.create_indexes() + # We only need to add the custom index for filtered event queries. + + # Compound index for filtered event queries (kind + timestamp) await event_col.create_index( - [('channel', ASCENDING), ('sequence_id', ASCENDING)], - name='channel_sequence_id' + [('data.kind', ASCENDING), ('timestamp', ASCENDING)], + name='kind_timestamp' ) # Subscriber state indexes @@ -315,7 +312,7 @@ def _eventhistory_to_cloudevent(self, event: Dict) -> str: return to_json(ce).decode('utf-8') # pylint: disable=too-many-arguments - async def _get_missed_events(self, channel: str, after_seq_id: int, + async def _get_missed_events(self, channel: str, after_seq_id: int, *, owner_filter: Optional[str] = None, promiscuous: bool = False, limit: int = None) -> List[Dict]: @@ -387,7 +384,7 @@ async def subscribe(self, channel: str, user: str, # If subscriber_id provided, set up durable subscription if subscriber_id: await self._setup_durable_subscription( - sub_id, subscriber_id, channel, user, promiscuous + sub_id, subscriber_id, channel, user, promiscuous=promiscuous ) return sub @@ -395,7 +392,7 @@ async def subscribe(self, channel: str, user: str, # pylint: disable=too-many-arguments async def _setup_durable_subscription( self, sub_id: int, subscriber_id: str, - channel: str, user: str, promiscuous: bool): + channel: str, user: str, *, promiscuous: bool): """Set up or restore durable subscription state""" col = self._mongo_db[self.SUBSCRIBER_STATE_COLLECTION] existing = await col.find_one({'subscriber_id': subscriber_id}) diff --git a/doc/events-migration.md b/doc/events-migration.md index 3218549f..478c0edb 100644 --- a/doc/events-migration.md +++ b/doc/events-migration.md @@ -102,6 +102,23 @@ each event document but do not affect existing queries: Existing queries filtering on `timestamp`, `data.kind`, `data.state`, etc. continue to work unchanged. +Supported query parameters for `/events`: + +| Parameter | Type | Description | +|-----------|------|-------------| +| `from` | string (ISO timestamp) | Return events with `timestamp` greater than this value. | +| `kind` | string | Filter by `data.kind` (e.g., `job`, `node`). | +| `state` | string | Filter by `data.state`. | +| `result` | string | Filter by `data.result`. | +| `limit` | integer | Maximum number of events to return. | +| `recursive` | bool | Attach related node info to each event (max `limit` 1000). | +| `id` | string (Mongo ObjectId) | Filter by a single event document id (top‑level `id` in `/events` response). | +| `ids` | string (comma‑separated ObjectIds) | Filter by multiple event document ids. | +| `node_id` | string | Filter events by affected node id (alias for `data.id`). | + +Note: `id`/`ids` refer to the event history document id, not the node id. To get +events about a node, use `node_id` (or `data.id`). + ## Subscriber ID Guidelines The `subscriber_id` should be: