Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 94 additions & 32 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from fastapi_users import FastAPIUsers
from beanie import PydanticObjectId
from pydantic import BaseModel
from kernelci.api.models import (

Check failure on line 44 in api/main.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
Node,
Hierarchy,
PublishEvent,
Expand Down Expand Up @@ -422,30 +422,69 @@
# No need for separate _get_eventhistory function


# TBD: Restrict response by Pydantic model
@app.get('/events')
async def get_events(request: Request):
"""Get all the events if no request parameters have passed.
Format: [{event1}, {event2}, ...] or if recursive is set to true,
then we add to each event the node information.
Get all the matching events otherwise.
Query parameters can be used to filter the events:
- limit: Number of events to return
- from: Start timestamp (unix epoch) to filter events
- kind: Event kind to filter events
- state: Event state to filter events
- result: Event result to filter events
- recursive: Retrieve node together with event
This API endpoint is under development and may change in future.
def _parse_event_id_filter(query_params: dict, event_id: str,
event_ids: str) -> None:
"""Parse and validate event id/ids filter parameters.

Modifies query_params in place to add _id filter.
Raises HTTPException on validation errors.
"""
if event_id and event_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Provide either id or ids, not both"
)
if event_id:
try:
query_params['_id'] = ObjectId(event_id)
except (errors.InvalidId, TypeError) as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid id format"
) from exc
elif event_ids:
try:
ids_list = [ObjectId(x.strip())
for x in event_ids.split(',') if x.strip()]
except (errors.InvalidId, TypeError) as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid ids format"
) from exc
if not ids_list:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="ids must contain at least one id"
)
query_params['_id'] = {'$in': ids_list}


def _build_events_query(query_params: dict) -> tuple:
"""Extract and process event query parameters.

Returns (recursive, processed_query_params).
Raises HTTPException on validation errors.
"""
metrics.add('http_requests_total', 1)
query_params = dict(request.query_params)
recursive = query_params.pop('recursive', None)
limit = query_params.pop('limit', None)
kind = query_params.pop('kind', None)
state = query_params.pop('state', None)
result = query_params.pop('result', None)
from_ts = query_params.pop('from', None)
node_id = query_params.pop('node_id', None)
event_id = query_params.pop('id', None)
event_ids = query_params.pop('ids', None)

if node_id:
if 'data.id' in query_params:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Provide either node_id or data.id, not both"
)
query_params['data.id'] = node_id

_parse_event_id_filter(query_params, event_id, event_ids)

if from_ts:
if isinstance(from_ts, str):
from_ts = datetime.fromisoformat(from_ts)
Expand All @@ -458,13 +497,38 @@
query_params['data.result'] = result
if limit:
query_params['limit'] = int(limit)
# limit recursive to 1000

if recursive and (not limit or int(limit) > 1000):
# generate error
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Recursive limit is too large, max is 1000"
)

return recursive, query_params


# TBD: Restrict response by Pydantic model
@app.get('/events')
async def get_events(request: Request):
"""Get all the events if no request parameters have passed.
Format: [{event1}, {event2}, ...] or if recursive is set to true,
then we add to each event the node information.
Get all the matching events otherwise.
Query parameters can be used to filter the events:
- limit: Number of events to return
- from: Start timestamp (unix epoch) to filter events
- kind: Event kind to filter events
- state: Event state to filter events
- result: Event result to filter events
- id / ids: Event document id(s) to filter events
- node_id: Node id to filter events (alias for data.id)
- recursive: Retrieve node together with event
This API endpoint is under development and may change in future.
"""
metrics.add('http_requests_total', 1)
query_params = dict(request.query_params)
recursive, query_params = _build_events_query(query_params)

resp = await db.find_by_attributes_nonpaginated(EventHistory, query_params)
resp_list = []
for item in resp:
Expand Down Expand Up @@ -1109,19 +1173,17 @@
return await purge_old_nodes(age_days=days, batch_size=batch_size)


versioned_app = VersionedFastAPI(
app,
version_format='{major}',
prefix_format='/v{major}',
enable_latest=True,
default_version=(0, 0),
on_startup=[
pubsub_startup,
create_indexes,
initialize_beanie,
start_background_tasks,
]
)
versioned_app = VersionedFastAPI(app,
version_format='{major}',
prefix_format='/v{major}',
enable_latest=True,
default_version=(0, 0),
on_startup=[
pubsub_startup,
create_indexes,
initialize_beanie,
start_background_tasks,
])


# traceback_exception_handler is a global exception handler that will be
Expand Down
27 changes: 12 additions & 15 deletions api/pubsub_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def __init__(self, mongo_client=None, host=None, db_number=None,
# In-memory subscription tracking (for fire-and-forget mode)
# {sub_id: {'sub': Subscription, 'redis_sub': PubSub,
# 'subscriber_id': str|None, ...}}
self._subscriptions: Dict[int, Dict[str, Any]] = {}
self._subscriptions: Dict[int, Dict[str, Any]] = \
{}
self._channels = set()
self._lock = asyncio.Lock()
self._keep_alive_timer = None
Expand Down Expand Up @@ -188,18 +189,14 @@ async def _ensure_indexes(self):
sub_col = self._mongo_db[self.SUBSCRIBER_STATE_COLLECTION]

# Event history indexes
# TTL index for auto-cleanup (7 days = 604800 seconds)
# Note: If index already exists with different TTL, this is a no-op.
# Migration handles dropping the old index first.
await event_col.create_index(
'timestamp',
expireAfterSeconds=604800,
name='ttl_timestamp'
)
# Compound index for efficient pub/sub catch-up queries
# Note: Standard indexes (TTL on timestamp, channel+sequence_id) are
# managed by the EventHistory model and created via db.create_indexes()
# We only need to add the custom index for filtered event queries.

# Compound index for filtered event queries (kind + timestamp)
await event_col.create_index(
[('channel', ASCENDING), ('sequence_id', ASCENDING)],
name='channel_sequence_id'
[('data.kind', ASCENDING), ('timestamp', ASCENDING)],
name='kind_timestamp'
)

# Subscriber state indexes
Expand Down Expand Up @@ -315,7 +312,7 @@ def _eventhistory_to_cloudevent(self, event: Dict) -> str:
return to_json(ce).decode('utf-8')

# pylint: disable=too-many-arguments
async def _get_missed_events(self, channel: str, after_seq_id: int,
async def _get_missed_events(self, channel: str, after_seq_id: int, *,
owner_filter: Optional[str] = None,
promiscuous: bool = False,
limit: int = None) -> List[Dict]:
Expand Down Expand Up @@ -387,15 +384,15 @@ async def subscribe(self, channel: str, user: str,
# If subscriber_id provided, set up durable subscription
if subscriber_id:
await self._setup_durable_subscription(
sub_id, subscriber_id, channel, user, promiscuous
sub_id, subscriber_id, channel, user, promiscuous=promiscuous
)

return sub

# pylint: disable=too-many-arguments
async def _setup_durable_subscription(
self, sub_id: int, subscriber_id: str,
channel: str, user: str, promiscuous: bool):
channel: str, user: str, *, promiscuous: bool):
"""Set up or restore durable subscription state"""
col = self._mongo_db[self.SUBSCRIBER_STATE_COLLECTION]
existing = await col.find_one({'subscriber_id': subscriber_id})
Expand Down
17 changes: 17 additions & 0 deletions doc/events-migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ each event document but do not affect existing queries:
Existing queries filtering on `timestamp`, `data.kind`, `data.state`, etc.
continue to work unchanged.

Supported query parameters for `/events`:

| Parameter | Type | Description |
|-----------|------|-------------|
| `from` | string (ISO timestamp) | Return events with `timestamp` greater than this value. |
| `kind` | string | Filter by `data.kind` (e.g., `job`, `node`). |
| `state` | string | Filter by `data.state`. |
| `result` | string | Filter by `data.result`. |
| `limit` | integer | Maximum number of events to return. |
| `recursive` | bool | Attach related node info to each event (max `limit` 1000). |
| `id` | string (Mongo ObjectId) | Filter by a single event document id (top‑level `id` in `/events` response). |
| `ids` | string (comma‑separated ObjectIds) | Filter by multiple event document ids. |
| `node_id` | string | Filter events by affected node id (alias for `data.id`). |

Note: `id`/`ids` refer to the event history document id, not the node id. To get
events about a node, use `node_id` (or `data.id`).

## Subscriber ID Guidelines

The `subscriber_id` should be:
Expand Down