Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Dec 26, 2025

Summary by CodeRabbit

  • New Features

    • Expanded replay controls: custom queries, exclude-event-types, target topics, retry options, and progress tracking.
    • DLQ messages now surface producer info, offsets/partitions, last-error and next-retry timing.
    • List endpoints (events, sagas) now include pagination: skip, limit, has_more.
  • Improvements

    • API responses standardized and more attribute-aware across events, executions, notifications, user settings and saved scripts.
    • Event metadata now uses an Avro-backed representation for improved serialization.
  • Documentation

    • Added guidance on Kafka test stability and serialized producer initialization.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 26, 2025

📝 Walkthrough

Walkthrough

Removed many infra mapper modules and switched route/service response construction to direct Pydantic validation/from-attributes; introduced AvroEventMetadata in infra and a new domain EventMetadata dataclass; added pagination result wrappers and propagated limit/skip through repos, services, and routes; expanded replay/DLQ/replay schemas and updated tests.

Changes

Cohort / File(s) Summary
API routes — response construction & pagination
backend/app/api/routes/admin/events.py, backend/app/api/routes/events.py, backend/app/api/routes/dlq.py, backend/app/api/routes/execution.py, backend/app/api/routes/notifications.py, backend/app/api/routes/saved_scripts.py, backend/app/api/routes/user_settings.py, backend/app/api/routes/replay.py, backend/app/api/routes/saga.py, backend/app/api/routes/admin/users.py
Replaced mapper intermediaries with direct Pydantic construction (model_validate, model_copy, jsonable_encoder) and adjusted several endpoints to accept/propagate pagination (limit, skip). Minor request-type change for saved scripts update parameter.
Mapper modules & exports removed/trimmed
backend/app/infrastructure/mappers/__init__.py, .../execution_api_mapper.py, .../notification_api_mapper.py, .../saved_script_api_mapper.py, .../user_settings_api_mapper.py, .../replay_api_mapper.py, .../admin_overview_api_mapper.py, .../saga_mapper.py, .../dlq_mapper.py, .../replay_mapper.py, .../rate_limit_mapper.py
Deleted many mapper classes and to_dict/to_response helpers; pruned __all__ exports. Replay mapper now constructs richer domain config (custom_query, exclude_event_types, target_topics, retry/progress flags).
Event metadata & mapper refactor
backend/app/infrastructure/mappers/event_mapper.py, backend/app/infrastructure/kafka/events/metadata.py, backend/app/infrastructure/kafka/events/__init__.py, backend/app/infrastructure/kafka/__init__.py, backend/app/domain/events/event_metadata.py, backend/app/domain/events/__init__.py, backend/app/domain/events/event_models.py
Infra metadata symbol renamed/exported as AvroEventMetadata; added domain EventMetadata dataclass with to_dict/from_dict helpers; reduced EventMapper surface and updated imports/aliases across codebase to Avro/domain metadata.
Pydantic schemas — config and new fields
backend/app/schemas_pydantic/events.py, .../admin_events.py, .../dlq.py, .../replay.py, .../replay_models.py, .../user_settings.py, backend/app/schemas_pydantic/saga.py
Added model_config = ConfigDict(from_attributes=True) widely; introduced EventMetadataResponse, HourlyEventCountSchema; expanded DLQ and replay schemas with new fields; added pagination fields (skip, limit, has_more) to list responses.
Services & Kafka publish changes
backend/app/services/kafka_event_service.py, various backend/app/services/*
Kafka publish APIs now accept AvroEventMetadata and convert to domain metadata; multiple services alias/import AvroEventMetadata as EventMetadata to preserve call sites.
Repositories & service return types (pagination)
backend/app/db/repositories/event_repository.py, backend/app/db/repositories/saga_repository.py, backend/app/services/event_service.py, backend/app/services/saga/saga_service.py
Repository methods extended with skip/limit and now return result wrappers (EventListResult, SagaListResult) containing items + total, skip, limit, has_more; services updated to accept/propagate pagination and operate on result objects.
Admin export & repo tweaks
backend/app/services/admin/admin_events_service.py, backend/app/db/repositories/admin/admin_events_repository.py
JSON export switched to dataclass asdict(); replay preview now returns typed EventSummary list and caller logic simplified.
User settings mapping defaults
backend/app/infrastructure/mappers/user_settings_mapper.py
Introduced domain defaults/fallbacks when mapping snapshots and ensured created_at/updated_at defaults.
Event producer init / tests & docs
backend/app/events/core/producer.py, docs/testing/kafka-test-stability.md, mkdocs.yml
Added global lock around Producer initialization to avoid librdkafka race conditions; new docs added and mkdocs navigation updated.
Tests — removed/simplified & metadata updates
many files under backend/tests/**, backend/tests/unit/infrastructure/mappers/*
Removed/simplified tests for deleted mappers; updated many tests to use AvroEventMetadata instead of removed EventMetadata; reduced mapper-helper coverage and adjusted expectations/access patterns.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant API_Route as "API Route"
  participant Service
  participant Repo
  participant DB
  Note over Client,DB: Paginated list fetch flow (limit/skip added)
  Client->>API_Route: GET /items?limit=50&skip=0
  API_Route->>Service: fetch_items(limit=50, skip=0)
  Service->>Repo: query(limit=50, skip=0)
  Repo->>DB: find + count (apply skip, limit)
  DB-->>Repo: documents + total_count
  Repo-->>Service: ResultWrapper(items, total, skip, limit, has_more)
  Service-->>API_Route: domain ResultWrapper
  API_Route->>API_Route: Pydantic.model_validate(result) / jsonable_encoder
  API_Route-->>Client: JSON (items, total, skip, limit, has_more)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

enhancement

Poem

🐇 I hop through code with whiskers all alight,
Mappers folded neatly, Pydantic takes flight.
Avro hums in metadata, tidy and bright,
Pagination footprints march left to right.
I nibble a carrot — tests pass tonight.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.19% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main objective: removing redundant mappers throughout the codebase. The pull request eliminates numerous mapper classes and methods across multiple files, replacing them with direct Pydantic model validation.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch less-mappers

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 36bead9 and 157462d.

📒 Files selected for processing (3)
  • backend/app/db/repositories/event_repository.py
  • backend/app/services/event_service.py
  • docs/testing/kafka-test-stability.md
✅ Files skipped from review due to trivial changes (1)
  • docs/testing/kafka-test-stability.md
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/db/repositories/event_repository.py (6)
backend/app/services/event_service.py (2)
  • get_events_by_correlation (119-138)
  • get_execution_events (31-59)
backend/app/api/routes/events.py (2)
  • get_events_by_correlation (133-158)
  • get_execution_events (36-64)
backend/app/domain/events/event_models.py (3)
  • correlation_id (93-94)
  • EventListResult (138-145)
  • EventFields (13-45)
backend/app/infrastructure/mappers/event_mapper.py (2)
  • from_mongo_document (45-78)
  • from_mongo_document (85-91)
backend/app/api/routes/execution.py (1)
  • get_execution_events (236-246)
backend/app/services/execution_service.py (1)
  • get_execution_events (283-316)
backend/app/services/event_service.py (4)
backend/app/domain/events/query_builders.py (1)
  • limit (24-26)
backend/app/domain/events/event_models.py (2)
  • EventListResult (138-145)
  • correlation_id (93-94)
backend/app/db/repositories/event_repository.py (2)
  • get_execution_events (162-183)
  • get_events_by_correlation (128-140)
backend/app/api/routes/events.py (2)
  • get_execution_events (36-64)
  • get_events_by_correlation (133-158)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (1)
backend/app/services/event_service.py (1)

31-59: Excellent fix: system event filtering now at DB level.

The previous review flagged pagination metadata drift from client-side filtering. This implementation correctly resolves that by delegating the exclude_system_events filter to the repository layer (line 45), ensuring that total, skip, limit, and has_more in the returned EventListResult accurately reflect the filtered query results.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/kafka_event_service.py (1)

1-19: Fix import ordering to resolve pipeline failure.

The Ruff linter detected unsorted imports. The infra_metadata_to_domain import on line 17 needs to be reordered.

🔎 Fix import ordering

Run ruff --fix to auto-sort imports, or manually reorder:

 from app.infrastructure.kafka.events.metadata import EventMetadata
-from app.infrastructure.mappers.event_mapper import infra_metadata_to_domain
 from app.infrastructure.kafka.mappings import get_event_class_for_type
+from app.infrastructure.mappers.event_mapper import infra_metadata_to_domain
 from app.settings import get_settings
🧹 Nitpick comments (6)
backend/app/infrastructure/mappers/replay_api_mapper.py (1)

103-108: Consider adding error handling for invalid EventType values.

If a client provides an invalid string key in target_topics, EventType(k) will raise a ValueError, resulting in an unhandled 500 error instead of a user-friendly 400 validation error.

🔎 Proposed fix with error handling
         # Convert string keys to EventType for target_topics if provided
         target_topics = None
         if req.target_topics:
             from app.domain.enums.events import EventType
 
-            target_topics = {EventType(k): v for k, v in req.target_topics.items()}
+            try:
+                target_topics = {EventType(k): v for k, v in req.target_topics.items()}
+            except ValueError as e:
+                raise ValueError(f"Invalid event type in target_topics: {e}") from e

Alternatively, Pydantic validation on the request schema could enforce valid EventType string values before reaching this mapper.

backend/app/schemas_pydantic/replay_models.py (1)

102-109: Minor: Redundant branch in validator.

Lines 107-109 can be simplified—the isinstance(v, dict) check returns v, but the fallback also returns v.

🔎 Simplified version
     @field_validator("config", mode="before")
     @classmethod
     def _coerce_config(cls, v: Any) -> Any:  # noqa: ANN001
         if isinstance(v, DomainReplayConfig):
             return ReplayConfigSchema.model_validate(v).model_dump()
-        if isinstance(v, dict):
-            return v
         return v
backend/app/api/routes/admin/users.py (1)

12-12: Consider removing AdminOverviewApiMapper for consistency.

While UserMapper was removed in favor of direct Pydantic validation, AdminOverviewApiMapper is still used at line 113. For consistency with the PR's objective of eliminating redundant mappers, consider migrating the overview endpoint to direct model validation as well.

backend/app/api/routes/admin/events.py (1)

13-18: Consider migrating remaining mappers for consistency.

While EventStatisticsMapper and ReplaySessionMapper were removed, other mappers remain in use: AdminReplayApiMapper, EventDetailMapper, EventFilterMapper, and EventMapper. For full consistency with the PR's objective, consider migrating these as well.

backend/app/infrastructure/mappers/user_settings_mapper.py (2)

58-59: Avoid non-deterministic defaults in mapping logic.

Using datetime.now(timezone.utc) as a fallback can lead to inconsistent behavior if created_at and updated_at are expected to come from the document. These timestamps should ideally be set at creation time by the caller, not during mapping.

Consider removing the fallback or raising an error if timestamps are missing:

-            created_at=doc.get("created_at", datetime.now(timezone.utc)),
-            updated_at=doc.get("updated_at", datetime.now(timezone.utc)),
+            created_at=doc["created_at"],
+            updated_at=doc["updated_at"],

Or, if the fallback is intentional for legacy documents, add a comment explaining why.


15-60: Consider eliminating this mapper for consistency.

While this file improves default handling, the PR's objective is to eliminate redundant mappers. If UserSettingsMapper is primarily used for API responses, consider migrating to direct Pydantic validation like other routes in this PR.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0b1ef49 and b6c7a22.

📒 Files selected for processing (25)
  • backend/app/api/routes/admin/events.py
  • backend/app/api/routes/admin/users.py
  • backend/app/api/routes/dlq.py
  • backend/app/api/routes/events.py
  • backend/app/api/routes/execution.py
  • backend/app/api/routes/notifications.py
  • backend/app/api/routes/saved_scripts.py
  • backend/app/api/routes/user_settings.py
  • backend/app/domain/events/event_models.py
  • backend/app/infrastructure/mappers/__init__.py
  • backend/app/infrastructure/mappers/event_mapper.py
  • backend/app/infrastructure/mappers/execution_api_mapper.py
  • backend/app/infrastructure/mappers/notification_api_mapper.py
  • backend/app/infrastructure/mappers/replay_api_mapper.py
  • backend/app/infrastructure/mappers/saved_script_api_mapper.py
  • backend/app/infrastructure/mappers/user_settings_api_mapper.py
  • backend/app/infrastructure/mappers/user_settings_mapper.py
  • backend/app/schemas_pydantic/admin_events.py
  • backend/app/schemas_pydantic/dlq.py
  • backend/app/schemas_pydantic/events.py
  • backend/app/schemas_pydantic/replay.py
  • backend/app/schemas_pydantic/replay_models.py
  • backend/app/schemas_pydantic/user_settings.py
  • backend/app/services/kafka_event_service.py
  • backend/tests/unit/infrastructure/mappers/test_execution_api_mapper.py
💤 Files with no reviewable changes (6)
  • backend/app/infrastructure/mappers/init.py
  • backend/app/infrastructure/mappers/user_settings_api_mapper.py
  • backend/app/infrastructure/mappers/execution_api_mapper.py
  • backend/app/infrastructure/mappers/notification_api_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_execution_api_mapper.py
  • backend/app/infrastructure/mappers/saved_script_api_mapper.py
🧰 Additional context used
🧬 Code graph analysis (12)
backend/app/api/routes/events.py (1)
backend/app/schemas_pydantic/events.py (2)
  • EventResponse (31-43)
  • EventStatistics (202-229)
backend/app/api/routes/dlq.py (1)
backend/app/schemas_pydantic/dlq.py (2)
  • DLQMessageResponse (22-39)
  • DLQTopicSummaryResponse (81-92)
backend/app/infrastructure/mappers/event_mapper.py (1)
backend/app/domain/events/event_models.py (3)
  • DomainEventMetadata (77-86)
  • correlation_id (106-107)
  • EventFields (13-45)
backend/app/api/routes/admin/events.py (1)
backend/app/schemas_pydantic/admin_events.py (2)
  • EventStatsResponse (100-110)
  • EventReplayStatusResponse (72-90)
backend/app/infrastructure/mappers/user_settings_mapper.py (1)
backend/app/domain/user/settings_models.py (2)
  • DomainNotificationSettings (13-18)
  • DomainEditorSettings (22-28)
backend/app/domain/events/event_models.py (1)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/app/schemas_pydantic/events.py (3)
backend/app/domain/events/event_models.py (1)
  • correlation_id (106-107)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/app/dlq/models.py (1)
  • event_type (93-95)
backend/app/infrastructure/mappers/replay_api_mapper.py (2)
backend/app/schemas_pydantic/replay.py (1)
  • ReplayRequest (10-36)
backend/app/domain/replay/models.py (1)
  • ReplayConfig (56-81)
backend/app/schemas_pydantic/replay_models.py (1)
backend/app/domain/replay/models.py (2)
  • ReplayConfig (56-81)
  • ReplayFilter (11-53)
backend/app/api/routes/execution.py (2)
backend/app/schemas_pydantic/execution.py (2)
  • ExecutionResponse (89-95)
  • ExecutionResult (98-111)
backend/app/services/idempotency/idempotency_manager.py (1)
  • mark_completed_with_json (256-269)
backend/app/services/kafka_event_service.py (2)
backend/app/infrastructure/mappers/event_mapper.py (1)
  • infra_metadata_to_domain (44-54)
backend/app/domain/events/event_models.py (1)
  • Event (90-107)
backend/app/api/routes/user_settings.py (2)
backend/app/domain/user/settings_models.py (3)
  • DomainEditorSettings (22-28)
  • DomainNotificationSettings (13-18)
  • DomainUserSettingsUpdate (47-54)
backend/app/schemas_pydantic/user_settings.py (6)
  • EditorSettings (23-47)
  • NotificationSettings (11-20)
  • RestoreSettingsRequest (121-125)
  • SettingsHistoryEntry (98-109)
  • UserSettings (50-65)
  • SettingsHistoryResponse (112-118)
🪛 GitHub Actions: Ruff Linting
backend/app/services/kafka_event_service.py

[error] 1-19: ISORT/Import block is un-sorted or un-formatted (I001). Found 2 errors. Organize imports (use 'ruff --fix' to auto-fix).

backend/app/api/routes/user_settings.py

[error] 49-49: E501 Line too long (122 > 120).

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (42)
backend/app/schemas_pydantic/replay.py (3)

2-2: LGTM!

Import addition is correct and minimal for supporting the new Dict[str, Any] type annotations.


23-24: LGTM!

New filter options custom_query and exclude_event_types are correctly typed and align with the domain model ReplayFilter fields.


33-36: LGTM!

New configuration fields are well-defined with appropriate defaults and constraints. The retry_attempts field correctly uses Field(default=3, ge=1, le=10) for validation.

backend/app/infrastructure/mappers/replay_api_mapper.py (2)

97-98: LGTM!

New filter fields are correctly mapped from the request to the domain filter.


120-123: LGTM!

New configuration fields are correctly propagated to the domain ReplayConfig.

backend/app/schemas_pydantic/replay_models.py (4)

13-14: LGTM!

Adding ConfigDict(from_attributes=True) enables direct instantiation from domain objects, aligning with the PR's goal of eliminating redundant mappers.


24-35: LGTM!

The from_domain classmethod cleanly converts domain ReplayFilter to the schema, with proper handling of optional EventType lists using the [...] or None pattern to return None for empty lists.


60-65: LGTM!

The field validator correctly coerces DomainReplayFilter instances to dicts for Pydantic validation.


67-80: LGTM!

The model validator handles DomainReplayConfig conversion well, including proper key stringification for target_topics.

backend/app/api/routes/saved_scripts.py (2)

25-27: LGTM! Direct domain model construction.

The pattern of unpacking model_dump() into domain constructors is clean and eliminates the mapper layer effectively.


58-66: LGTM! Correct use of SavedScriptUpdate for update endpoint.

The parameter type correctly changed from SavedScriptCreateRequest to SavedScriptUpdate, and the domain conversion pattern matches the create endpoint. Both SavedScriptUpdate and DomainSavedScriptUpdate have identical fields: name, script, lang, lang_version, description, and updated_at, so the conversion via DomainSavedScriptUpdate(**script_update.model_dump()) works seamlessly.

backend/app/services/kafka_event_service.py (2)

79-79: Correct metadata conversion to domain model.

Using infra_metadata_to_domain(event_metadata) properly converts infrastructure metadata to the domain representation for storage.


72-96: The dual metadata representation is intentional and correct. The Event domain model uses DomainEventMetadata for storage (line 79), while the Kafka-bound BaseEvent uses infrastructure EventMetadata which extends AvroBase for Avro serialization. This is proper architectural separation: domain layer abstraction vs. infrastructure serialization layer. Both metadata types have identical fields but serve different purposes—no refactoring needed.

backend/app/api/routes/admin/events.py (2)

209-209: LGTM! Direct Pydantic validation for replay status.

Consistent with the statistics endpoint change.


70-70: LGTM! Direct Pydantic validation for statistics.

The conversion from mapper-based to model_validate is clean and aligns with the PR objective. EventStatsResponse has the required model_config = ConfigDict(from_attributes=True).

backend/app/api/routes/dlq.py (2)

147-147: LGTM! Consistent pattern for topic summaries.

Matches the message conversion pattern on line 63.


62-63: LGTM! Clean list comprehension with model validation.

The conversion is straightforward and correct. DLQMessageResponse has the proper Pydantic configuration (ConfigDict(from_attributes=True)) needed for model_validate() to work correctly.

backend/app/api/routes/notifications.py (3)

43-47: LGTM! Consistent migration to model validation.

Clean list comprehension pattern for notifications.


79-81: LGTM! Subscriptions correctly converted.

Matches the pattern used in the notifications endpoint.


103-103: LGTM! Single subscription validation completes the consistent migration of all notification endpoints.

All notification response models have from_attributes=True configured, and the model_validate pattern is applied consistently across endpoints.

backend/app/infrastructure/mappers/user_settings_mapper.py (3)

24-33: LGTM! Improved default handling using domain models.

Using domain dataclass instances for fallback values is cleaner than hard-coded literals and ensures consistency with domain defaults.


42-46: LGTM! Consistent notification defaults.

Fallback values correctly sourced from default_notifications.


49-54: LGTM! Consistent editor defaults.

Fallback values correctly sourced from default_editor.

backend/app/api/routes/admin/users.py (2)

54-64: LGTM! Clean migration to Pydantic model validation.

The refactor correctly uses model_validate to construct UserResponse from domain objects and model_copy to enrich with rate-limit fields. The UserResponse model properly includes model_config = ConfigDict(from_attributes=True) to support attribute-based population from domain objects. The optional rate-limit fields (bypass_rate_limit, global_multiplier, has_custom_limits) are well-defined and match the conditional enrichment pattern.


86-86: The field alignment between domain_user (User domain model) and UserResponse is correctly configured. The domain User dataclass contains all required fields (user_id, username, email, role, is_active, is_superuser, created_at, updated_at) that UserResponse expects, and the schema's model_config = ConfigDict(from_attributes=True) explicitly enables attribute-based population. The optional rate limit fields in UserResponse have defaults, so their absence in the domain object poses no issue. The pattern is used consistently throughout the file without issues.

backend/app/domain/events/event_models.py (1)

97-97: Backward compatibility is maintained through conversion layer with defaults.

The metadata type change from EventMetadata to DomainEventMetadata is handled by metadata_from_dict() in the event mapper (lines 31-41), which gracefully deserializes existing events with fallback defaults for missing fields (service_name="unknown", service_version="1.0", environment="production"). Existing events in the database will deserialize correctly without migration.

backend/app/schemas_pydantic/admin_events.py (1)

72-110: LGTM!

The addition of ConfigDict(from_attributes=True) to EventReplayStatusResponse and EventStatsResponse aligns with the PR's goal of enabling direct Pydantic model validation from domain objects, eliminating the need for explicit mappers.

backend/app/api/routes/execution.py (3)

118-128: LGTM!

The refactor correctly replaces the mapper with direct Pydantic validation. The idempotency flow properly uses model_dump_json() for serialization and model_validate_json() (line 104) for deserialization, maintaining consistency with the cached JSON pattern.


232-232: LGTM!

Consistent application of model_validate for the retry response.


276-276: LGTM!

The list comprehension using ExecutionResult.model_validate(e) is consistent with the pattern applied elsewhere in this PR.

backend/app/api/routes/events.py (2)

52-54: LGTM!

Direct Pydantic validation replaces the mapper correctly. The pattern is consistent across all event endpoints.


187-198: Verify that the environment field from DomainEventMetadata should be excluded from the API response.

The EventMetadataResponse model is missing the environment field present in DomainEventMetadata. When using model_validate(from_attributes=True), Pydantic will ignore the extra environment field and populate only the matching fields (service_name, service_version, correlation_id, user_id, ip_address, user_agent). This works correctly but silently drops the environment metadata. Confirm whether excluding this field from the API response is intentional.

backend/app/api/routes/user_settings.py (1)

100-101: LGTM!

The history handling correctly validates each entry via model_validate and constructs the response with a computed total.

backend/app/infrastructure/mappers/event_mapper.py (2)

26-55: LGTM! Good centralization of metadata conversion logic.

The new helper functions (metadata_to_dict, metadata_from_dict, infra_metadata_to_domain) provide a clean abstraction for metadata conversion between domain and infrastructure layers. The defaults in metadata_from_dict are sensible for handling incomplete data from storage.


68-68: LGTM!

The to_mongo_document method now uses the centralized metadata_to_dict helper consistently.

backend/app/schemas_pydantic/events.py (3)

17-28: LGTM!

EventMetadataResponse provides a structured schema for event metadata in API responses, replacing the generic Dict[str, Any]. The field names align with DomainEventMetadata, enabling seamless model_validate with from_attributes=True.


31-43: LGTM!

The EventResponse model now uses the typed EventMetadataResponse for the metadata field, improving API schema clarity and enabling proper validation.


212-229: LGTM!

Adding from_attributes=True to EventStatistics enables direct population from domain objects via model_validate.

backend/app/schemas_pydantic/user_settings.py (2)

11-20: LGTM!

The from_attributes=True configuration enables direct population from domain objects while preserving existing field defaults.


50-65: LGTM!

UserSettings correctly gets from_attributes=True, enabling model_validate from domain settings objects. The nested NotificationSettings and EditorSettings also have this config, ensuring proper recursive validation.

backend/app/schemas_pydantic/dlq.py (2)

95-117: LGTM!

DLQMessageDetail provides comprehensive DLQ message information with proper typing. The from_attributes=True configuration is correctly applied.


22-39: LGTM! Improved schema with explicit fields.

Replacing the generic details: dict[str, Any] with explicit typed fields (producer_id, dlq_offset, dlq_partition, last_error, next_retry_at) improves API clarity and type safety. The from_attributes=True config enables direct validation from the domain DLQMessage dataclass, with all schema fields present and type-compatible in the domain model.

@codecov-commenter
Copy link

codecov-commenter commented Dec 26, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 82.90598% with 40 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
backend/app/api/routes/events.py 62.50% 5 Missing and 1 partial ⚠️
backend/app/domain/events/event_metadata.py 80.00% 3 Missing and 3 partials ⚠️
backend/app/db/repositories/saga_repository.py 16.66% 5 Missing ⚠️
backend/app/services/admin/admin_events_service.py 33.33% 4 Missing ⚠️
backend/app/api/routes/saga.py 57.14% 3 Missing ⚠️
...nd/app/infrastructure/mappers/replay_api_mapper.py 25.00% 2 Missing and 1 partial ⚠️
...p/db/repositories/admin/admin_events_repository.py 33.33% 2 Missing ⚠️
...app/infrastructure/mappers/user_settings_mapper.py 66.66% 1 Missing and 1 partial ⚠️
backend/app/services/saga/saga_orchestrator.py 33.33% 2 Missing ⚠️
backend/app/api/routes/admin/events.py 50.00% 1 Missing ⚠️
... and 6 more
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Flag Coverage Δ
backend 67.68% <82.90%> (+0.21%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
backend/app/api/routes/admin/users.py 82.14% <100.00%> (-1.38%) ⬇️
backend/app/api/routes/dlq.py 80.00% <100.00%> (ø)
backend/app/api/routes/notifications.py 95.55% <100.00%> (-0.10%) ⬇️
backend/app/api/routes/saved_scripts.py 100.00% <100.00%> (ø)
backend/app/db/repositories/event_repository.py 47.55% <100.00%> (+1.19%) ⬆️
...nd/app/db/repositories/user_settings_repository.py 81.13% <100.00%> (ø)
backend/app/domain/events/__init__.py 100.00% <100.00%> (ø)
backend/app/domain/events/event_models.py 96.19% <100.00%> (ø)
backend/app/events/core/producer.py 77.08% <100.00%> (+0.48%) ⬆️
backend/app/infrastructure/kafka/__init__.py 100.00% <100.00%> (ø)
... and 38 more
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (1)

56-74: Ineffective assertion due to or True.

Line 61's assertion assert ... or True is a tautology that always passes, making it ineffective. If the field names vary, consider asserting on actual expected keys or removing this line.

🔎 Proposed fix
-    assert "_deleted_at" in arch_doc or "_deletion_reason" in arch_doc or True  # enum names vary
+    # Verify archive-specific fields are present
+    assert EventFields.DELETED_AT in arch_doc or "_deleted_at" in arch_doc
+    assert EventFields.DELETED_BY in arch_doc or "_deleted_by" in arch_doc
🧹 Nitpick comments (3)
backend/app/infrastructure/mappers/replay_api_mapper.py (1)

50-55: Handle invalid EventType values gracefully.

The conversion EventType(k) will raise a ValueError if k is not a valid EventType member. Consider adding validation or letting the error propagate with a clearer message to the caller.

🔎 Proposed defensive handling
         target_topics = None
         if req.target_topics:
             from app.domain.enums.events import EventType
 
-            target_topics = {EventType(k): v for k, v in req.target_topics.items()}
+            try:
+                target_topics = {EventType(k): v for k, v in req.target_topics.items()}
+            except ValueError as e:
+                raise ValueError(f"Invalid event type in target_topics: {e}") from e
backend/app/infrastructure/mappers/event_mapper.py (2)

44-78: Consider removing duplicate field entries in base_fields.

EventFields.STORED_AT and the literal "stored_at" likely resolve to the same value, creating redundancy. If EventFields.STORED_AT.value == "stored_at", one entry can be removed.

🔎 Suggested simplification
         base_fields = {
             EventFields.EVENT_ID,
             EventFields.EVENT_TYPE,
             EventFields.EVENT_VERSION,
             EventFields.TIMESTAMP,
             EventFields.METADATA,
             EventFields.AGGREGATE_ID,
             EventFields.STORED_AT,
             EventFields.TTL_EXPIRES_AT,
             EventFields.STATUS,
             EventFields.ERROR,
             "_id",
-            "stored_at",
         }

98-110: Unnecessary instantiation of EventMapper.

EventMapper.to_mongo_document is a @staticmethod, so instantiating EventMapper() on line 100 is unnecessary.

🔎 Proposed fix
     @staticmethod
     def to_mongo_document(event: ArchivedEvent) -> dict[str, Any]:
-        event_mapper = EventMapper()
-        doc = event_mapper.to_mongo_document(event)
+        doc = EventMapper.to_mongo_document(event)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b6c7a22 and 726276e.

📒 Files selected for processing (17)
  • backend/app/api/routes/admin/events.py
  • backend/app/api/routes/admin/users.py
  • backend/app/api/routes/replay.py
  • backend/app/api/routes/saga.py
  • backend/app/infrastructure/mappers/__init__.py
  • backend/app/infrastructure/mappers/admin_mapper.py
  • backend/app/infrastructure/mappers/admin_overview_api_mapper.py
  • backend/app/infrastructure/mappers/dlq_mapper.py
  • backend/app/infrastructure/mappers/event_mapper.py
  • backend/app/infrastructure/mappers/replay_api_mapper.py
  • backend/app/infrastructure/mappers/saga_mapper.py
  • backend/app/services/kafka_event_service.py
  • backend/tests/unit/infrastructure/mappers/test_admin_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_dlq_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py
  • backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_saga_mapper_extended.py
💤 Files with no reviewable changes (5)
  • backend/app/infrastructure/mappers/admin_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_dlq_mapper.py
  • backend/app/infrastructure/mappers/admin_overview_api_mapper.py
  • backend/app/infrastructure/mappers/dlq_mapper.py
  • backend/app/infrastructure/mappers/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • backend/app/api/routes/admin/users.py
  • backend/app/services/kafka_event_service.py
🧰 Additional context used
🧬 Code graph analysis (4)
backend/tests/unit/infrastructure/mappers/test_admin_mapper.py (1)
backend/app/infrastructure/mappers/admin_mapper.py (3)
  • AuditLogMapper (230-251)
  • SettingsMapper (126-227)
  • UserMapper (31-123)
backend/app/infrastructure/mappers/replay_api_mapper.py (2)
backend/app/schemas_pydantic/replay.py (1)
  • ReplayRequest (10-36)
backend/app/domain/replay/models.py (1)
  • ReplayConfig (56-81)
backend/app/api/routes/admin/events.py (1)
backend/app/schemas_pydantic/admin_events.py (2)
  • EventStatsResponse (100-110)
  • EventReplayStatusResponse (72-90)
backend/app/infrastructure/mappers/event_mapper.py (1)
backend/app/domain/events/event_models.py (1)
  • EventFields (13-45)
🪛 GitHub Actions: MyPy Type Checking
backend/app/infrastructure/mappers/event_mapper.py

[error] 4-4: mypy: Module 'app.domain.events.event_models' does not explicitly export attribute 'EventMetadata'.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (19)
backend/app/infrastructure/mappers/saga_mapper.py (1)

1-1: LGTM!

The import cleanup correctly removes List and response-related types that are no longer needed after removing SagaResponseMapper. The remaining mappers appropriately focus on persistence-layer concerns (MongoDB and Kafka).

backend/tests/unit/infrastructure/mappers/test_saga_mapper_extended.py (1)

376-376: LGTM!

The test file correctly removes tests for the deleted SagaResponseMapper and to_dict method while maintaining comprehensive coverage for the remaining persistence-layer mappers.

backend/app/api/routes/saga.py (3)

49-49: LGTM!

The switch to SagaStatusResponse.from_domain(saga) is a clean pattern for domain-to-response conversion, eliminating the need for the separate SagaResponseMapper class.


80-81: Consistent refactor pattern.

Using list comprehension with SagaStatusResponse.from_domain(s) is more readable and maintains consistency with the PR's goal of eliminating mapper indirection.


111-112: LGTM!

Same pattern applied consistently for the paginated list endpoint.

backend/tests/unit/infrastructure/mappers/test_admin_mapper.py (2)

4-4: LGTM!

Import updated correctly to reflect the removal of UserListResultMapper from the mappers module exports.


14-14: LGTM!

Import correctly updated - UserListResult removed as it's no longer used after mapper cleanup.

backend/app/api/routes/admin/events.py (4)

49-54: LGTM!

Using jsonable_encoder for event objects is appropriate here since events may contain complex nested structures or non-standard types that need explicit JSON serialization before being included in the response.


66-67: LGTM!

EventStatsResponse.model_validate(stats) works correctly since the schema has model_config = ConfigDict(from_attributes=True) configured (as shown in the relevant code snippets).


144-148: LGTM!

Consistent use of jsonable_encoder for event-related objects that may contain complex nested data.


203-204: LGTM!

EventReplayStatusResponse.model_validate(status) is correctly applied. The schema has model_config = ConfigDict(from_attributes=True) as confirmed in the relevant code snippets.

backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py (1)

274-274: LGTM!

Test file correctly updated to align with the removal of response-layer mappers. The remaining tests provide good coverage for persistence-layer mappers (EventMapper, ArchivedEventMapper, EventFilterMapper, EventExportRowMapper).

backend/app/infrastructure/mappers/replay_api_mapper.py (2)

44-46: LGTM!

The additional fields custom_query and exclude_event_types are now correctly propagated from the request to ReplayFilter, aligning with the schema in ReplayRequest.


67-71: LGTM!

The additional fields (target_topics, retry_failed, retry_attempts, enable_progress_tracking) are correctly passed to ReplayConfig, matching the domain model definition in the relevant code snippets.

backend/app/api/routes/replay.py (1)

70-73: LGTM!

The switch from mapper-based response construction to ReplaySession.model_validate(state) aligns with the PR's refactoring goal. ReplaySession has model_config = ConfigDict(from_attributes=True) properly configured, supporting attribute-based validation from the domain state object.

backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (2)

34-42: LGTM!

The test correctly validates the to_mongo_document and from_mongo_document round-trip, including the behavior of merging extra fields into the payload.


45-53: LGTM!

The test correctly validates EventSummaryMapper.from_mongo_document behavior. The omission of aggregate_id from the test dict is intentional as it tests the optional field handling via .get().

backend/app/infrastructure/mappers/event_mapper.py (2)

164-214: LGTM!

EventFilterMapper correctly converts filters to MongoDB queries. The dual assignment of search_text and text_search on lines 211-212 provides backward compatibility with the query logic on line 192.


20-42: EventMetadata.to_dict(exclude_none=True) method is correctly implemented.

The EventMetadata class in app.infrastructure.kafka.events.metadata has the to_dict method with the exclude_none parameter (default True). The method delegates to Pydantic's model_dump and the call in line 27 of event_mapper.py is valid.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
backend/app/api/routes/user_settings.py (1)

77-77: Schema compatibility assumption (same as lines 50, 52).

These lines use the same **model_dump() pattern flagged in the earlier comment. The concern about schema field compatibility applies here as well and will be verified by the same script.

Also applies to: 90-90

🧹 Nitpick comments (5)
backend/app/api/routes/execution.py (1)

17-17: Consider avoiding the import alias to improve code clarity.

Aliasing AvroEventMetadata as EventMetadata can be confusing since a separate EventMetadata class exists in the domain layer (app.domain.events.event_metadata). This makes it unclear which metadata model is being used without checking imports.

🔎 Suggested refactor to use the explicit name
-from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
+from app.infrastructure.kafka.events.metadata import AvroEventMetadata

Then update usages at lines 85 and 188:

-            metadata=EventMetadata(
+            metadata=AvroEventMetadata(
backend/app/api/routes/user_settings.py (1)

44-54: The line-length violation has been resolved, and nested model schemas are compatible.

The manual field mapping at lines 45-53 remains brittle—adding fields to UserSettingsUpdate will require corresponding updates here. The **model_dump() pattern on lines 50 and 52 is safe because API and domain schemas have identical field names and types. Consider extracting this construction logic to a mapper function to reduce maintenance burden if the schemas evolve.

backend/app/infrastructure/kafka/events/metadata.py (1)

26-36: Consider aligning environment parsing with domain EventMetadata.from_dict.

The domain EventMetadata.from_dict (in backend/app/domain/events/event_metadata.py) explicitly handles string-to-enum conversion for environment:

env = data.get("environment", Environment.PRODUCTION)
if isinstance(env, str):
    env = Environment(env)

This Avro version passes the value directly, relying on Pydantic's coercion. While use_enum_values=True in the model config helps, explicit handling would be more robust for edge cases where the input is a raw string.

🔎 Proposed fix
     @classmethod
     def from_dict(cls, data: Dict[str, Any]) -> "AvroEventMetadata":
+        env = data.get("environment", Environment.PRODUCTION)
+        if isinstance(env, str):
+            env = Environment(env)
         return cls(
             service_name=data.get("service_name", "unknown"),
             service_version=data.get("service_version", "1.0"),
             correlation_id=data.get("correlation_id", str(uuid4())),
             user_id=data.get("user_id"),
             ip_address=data.get("ip_address"),
             user_agent=data.get("user_agent"),
-            environment=data.get("environment", Environment.PRODUCTION),
+            environment=env,
         )
backend/app/domain/events/event_metadata.py (1)

43-47: Consider adding ensure_correlation_id for feature parity with AvroEventMetadata.

The AvroEventMetadata class has an ensure_correlation_id() method that generates a new UUID if the correlation_id is empty. If this functionality is needed at the domain level, consider adding it here for consistency.

🔎 Optional addition
     def with_user(self, user_id: str) -> "EventMetadata":
         return replace(self, user_id=user_id)
+
+    def ensure_correlation_id(self) -> "EventMetadata":
+        if self.correlation_id:
+            return self
+        return replace(self, correlation_id=str(uuid4()))
backend/app/db/repositories/admin/admin_events_repository.py (1)

387-388: Eliminate circular conversion for better performance and clarity.

The current flow converts MongoDB documents to EventSummary objects (line 365), then to dicts (line 366), which are returned, and then immediately converted back to EventSummary objects here (line 388). This is inefficient and obscures intent.

Consider either:

  1. Changing get_events_preview_for_replay to return List[EventSummary] directly when it's only used internally for this purpose, or
  2. Creating a separate internal method that returns EventSummary objects to avoid the round-trip serialization.
🔎 Proposed refactor to return EventSummary objects directly

Option 1: Modify the method signature and remove the dict conversion:

-    async def get_events_preview_for_replay(self, query: Dict[str, Any], limit: int = 100) -> List[Dict[str, Any]]:
+    async def get_events_preview_for_replay(self, query: Dict[str, Any], limit: int = 100) -> List[EventSummary]:
         """Get preview of events for replay."""
         cursor = self.events_collection.find(query).limit(limit)
         event_docs = await cursor.to_list(length=limit)
 
         # Convert to event summaries
-        summaries: List[Dict[str, Any]] = []
+        summaries: List[EventSummary] = []
         for doc in event_docs:
             summary = self.summary_mapper.from_mongo_document(doc)
-            summaries.append(asdict(summary))
+            summaries.append(summary)
 
         return summaries

Then update the caller to remove the redundant conversion:

         if dry_run:
             preview_docs = await self.get_events_preview_for_replay(query, limit=100)
-            events_preview = [self.summary_mapper.from_mongo_document(e) for e in preview_docs]
+            events_preview = preview_docs
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 726276e and 205799f.

📒 Files selected for processing (21)
  • backend/app/api/routes/events.py
  • backend/app/api/routes/execution.py
  • backend/app/api/routes/user_settings.py
  • backend/app/db/repositories/admin/admin_events_repository.py
  • backend/app/domain/events/__init__.py
  • backend/app/domain/events/event_metadata.py
  • backend/app/domain/events/event_models.py
  • backend/app/infrastructure/kafka/__init__.py
  • backend/app/infrastructure/kafka/events/__init__.py
  • backend/app/infrastructure/kafka/events/base.py
  • backend/app/infrastructure/kafka/events/metadata.py
  • backend/app/infrastructure/mappers/event_mapper.py
  • backend/app/infrastructure/mappers/saga_mapper.py
  • backend/app/services/admin/admin_events_service.py
  • backend/app/services/coordinator/coordinator.py
  • backend/app/services/execution_service.py
  • backend/app/services/kafka_event_service.py
  • backend/app/services/pod_monitor/event_mapper.py
  • backend/app/services/result_processor/processor.py
  • backend/app/services/saga/execution_saga.py
  • backend/app/services/saga/saga_orchestrator.py
🧰 Additional context used
🧬 Code graph analysis (19)
backend/app/services/pod_monitor/event_mapper.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/services/result_processor/processor.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/domain/events/event_models.py (1)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/services/saga/execution_saga.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/domain/events/__init__.py (1)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/services/kafka_event_service.py (3)
backend/app/domain/events/event_metadata.py (4)
  • EventMetadata (9-47)
  • with_correlation (43-44)
  • from_dict (29-41)
  • to_dict (20-26)
backend/app/infrastructure/kafka/events/metadata.py (4)
  • AvroEventMetadata (10-47)
  • with_correlation (38-39)
  • from_dict (27-36)
  • to_dict (23-24)
backend/app/domain/events/event_models.py (2)
  • correlation_id (93-94)
  • Event (77-94)
backend/app/infrastructure/kafka/events/metadata.py (3)
backend/app/domain/events/event_metadata.py (3)
  • from_dict (29-41)
  • with_correlation (43-44)
  • with_user (46-47)
backend/app/domain/events/event_models.py (1)
  • correlation_id (93-94)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/app/services/coordinator/coordinator.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/infrastructure/kafka/events/base.py (1)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/api/routes/user_settings.py (3)
backend/app/domain/user/settings_models.py (3)
  • DomainEditorSettings (22-28)
  • DomainNotificationSettings (13-18)
  • DomainUserSettingsUpdate (47-54)
backend/app/schemas_pydantic/user_settings.py (6)
  • EditorSettings (23-47)
  • NotificationSettings (11-20)
  • RestoreSettingsRequest (121-125)
  • SettingsHistoryEntry (98-109)
  • UserSettings (50-65)
  • SettingsHistoryResponse (112-118)
backend/app/api/dependencies.py (1)
  • current_user (10-12)
backend/app/infrastructure/mappers/event_mapper.py (2)
backend/app/domain/events/event_metadata.py (2)
  • EventMetadata (9-47)
  • to_dict (20-26)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • to_dict (23-24)
backend/app/services/execution_service.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/infrastructure/kafka/__init__.py (1)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/api/routes/execution.py (3)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/schemas_pydantic/execution.py (2)
  • ExecutionResponse (89-95)
  • ExecutionResult (98-111)
backend/app/infrastructure/kafka/events/__init__.py (1)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/services/saga/saga_orchestrator.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/api/routes/events.py (3)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/schemas_pydantic/events.py (2)
  • EventResponse (31-43)
  • EventStatistics (202-229)
backend/app/domain/events/event_metadata.py (2)
backend/app/domain/enums/common.py (1)
  • Environment (27-33)
backend/app/domain/events/event_models.py (1)
  • correlation_id (93-94)
backend/app/infrastructure/mappers/saga_mapper.py (3)
backend/app/domain/saga/models.py (3)
  • Saga (10-25)
  • SagaFilter (29-38)
  • SagaInstance (106-121)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (30)
backend/app/services/result_processor/processor.py (1)

26-26: Import change is correct and necessary.

The metadata field in both ResultStoredEvent and ResultFailedEvent is inherited from BaseEvent, which explicitly defines metadata: AvroEventMetadata (line 21 in backend/app/infrastructure/kafka/events/base.py). The import change at line 26 aligns the result processor with the required type, eliminating the previous type mismatch between domain EventMetadata and the Pydantic-based AvroEventMetadata used throughout the Kafka event system.

backend/app/api/routes/execution.py (2)

120-128: Verify domain object compatibility with Pydantic model validation.

The change from mapper-based conversion to ExecutionResponse.model_validate() assumes the domain object returned by execute_script() has attributes matching the Pydantic model (execution_id, status). Runtime failures will occur if attribute names or types don't match.


276-276: All required fields (execution_id, status, stdout, stderr, lang, lang_version, resource_usage, exit_code, error_type) are present in the DomainExecution class returned by get_user_executions() with compatible types. The ExecutionResult model has ConfigDict(from_attributes=True), which enables successful validation of dataclass instances. The validation will work correctly without runtime failures.

backend/app/services/admin/admin_events_service.py (3)

3-3: LGTM: asdict import added.

The import is correctly added to support the new dataclass-based serialization approach.


24-24: LGTM: Unused mapper removed.

The EventMapper import is correctly removed since it's no longer used, while EventExportRowMapper is retained for CSV export functionality.


225-228: The primary concern in this review is incorrect—asdict() is the correct approach here.

The Event class is a @dataclass (line 76 of event_models.py), so asdict() works correctly and will not raise a TypeError. The concern about Pydantic models does not apply.

However, the loop checking for ["timestamp", "created_at", "updated_at", "stored_at", "ttl_expires_at"] contains dead code: the Event model does not have created_at or updated_at fields, so those conditions will never match. Consider removing these from the list of fields to check.

Likely an incorrect or invalid review comment.

backend/app/api/routes/user_settings.py (3)

8-12: LGTM!

The new domain model imports align with the PR's objective to eliminate mappers and use direct domain model construction.

Also applies to: 18-18


35-35: LGTM!

Using model_validate with from_attributes=True is the idiomatic Pydantic v2 pattern for converting domain objects to API models, eliminating the need for explicit mappers.


56-56: LGTM!

All endpoints consistently use model_validate to convert domain objects to API responses, maintaining a uniform pattern throughout the file.

Also applies to: 66-66, 79-79, 92-92, 113-113, 124-124

backend/app/services/coordinator/coordinator.py (1)

36-36: LGTM - Import aliasing correctly maps to infrastructure type.

The coordinator produces Kafka events (CreatePodCommandEvent, ExecutionAcceptedEvent, etc.) that require AvroEventMetadata for their metadata field. The alias maintains code compatibility while using the correct Pydantic-based type for Kafka serialization.

backend/app/services/saga/execution_saga.py (1)

8-8: LGTM - Consistent with Kafka event infrastructure.

The saga steps produce CreatePodCommandEvent and DeletePodCommandEvent which require AvroEventMetadata. The aliasing pattern is consistent with other service files.

backend/app/domain/events/event_models.py (1)

7-7: LGTM - Correctly uses domain-layer EventMetadata.

The domain Event model appropriately uses the domain EventMetadata (dataclass) rather than the infrastructure AvroEventMetadata. This maintains proper layer separation: domain models use dataclasses, infrastructure uses Pydantic for serialization.

backend/app/services/execution_service.py (1)

26-26: LGTM - Consistent import aliasing for Kafka event production.

The ExecutionService produces ExecutionRequestedEvent and ExecutionCancelledEvent which require AvroEventMetadata. The aliasing pattern is consistent across all service files.

backend/app/services/saga/saga_orchestrator.py (1)

18-18: LGTM - Correct import for saga event publishing.

The orchestrator's _publish_saga_cancelled_event creates SagaCancelledEvent which requires AvroEventMetadata. Consistent with the project-wide aliasing pattern.

backend/app/infrastructure/kafka/events/base.py (1)

10-21: LGTM - Direct import appropriate for infrastructure base class.

BaseEvent is the canonical Kafka event base in the infrastructure layer. Using AvroEventMetadata directly (without aliasing) is correct here since this defines the authoritative metadata type for all Kafka events.

backend/app/domain/events/__init__.py (1)

1-1: LGTM - Proper domain package re-export.

The domain events package correctly re-exports EventMetadata from its new dedicated module, providing a clean public API for domain layer consumers.

backend/app/services/pod_monitor/event_mapper.py (1)

18-18: LGTM - Consistent aliasing for pod event production.

The PodEventMapper creates various Kafka events (PodScheduledEvent, PodRunningEvent, ExecutionCompletedEvent, etc.) that require AvroEventMetadata. The aliasing pattern is consistent with all other service files.

backend/app/infrastructure/kafka/__init__.py (1)

4-14: LGTM - Clean export surface update.

The import rename and __all__ updates correctly reflect the migration to AvroEventMetadata and properly expose get_topic_for_event which was already imported.

backend/app/infrastructure/kafka/events/__init__.py (1)

13-13: LGTM - Consistent with Kafka metadata rename.

Import and export correctly updated to use AvroEventMetadata, maintaining consistency across the Kafka infrastructure layer.

Also applies to: 73-73

backend/app/infrastructure/kafka/events/metadata.py (1)

10-10: LGTM - Class rename and method signatures correctly updated.

The rename to AvroEventMetadata and all return type annotations are consistent.

Also applies to: 38-47

backend/app/api/routes/events.py (2)

15-15: Good use of aliasing for backward compatibility.

Importing AvroEventMetadata as EventMetadata minimizes code churn while aligning with the new naming convention.


52-52: All verification concerns confirmed: model_validate works correctly with domain dataclasses.

Both EventResponse and EventMetadataResponse have from_attributes=True in their ConfigDict. The domain Event dataclass maps cleanly to the Pydantic schema:

  • Nested EventMetadata dataclass correctly converts to EventMetadataResponse (both configured for from_attributes)
  • correlation_id property on Event is accessible via from_attributes=True
  • causation_id is optional in the schema and safely defaults to None when absent from the domain model
  • environment field in domain metadata uses the schema default of "production"

The migration from mapper-based construction to model_validate is valid and works as expected.

backend/app/infrastructure/mappers/saga_mapper.py (1)

1-5: LGTM - Clean import updates.

Simplified typing import and consistent use of the AvroEventMetadata as EventMetadata alias pattern. The removal of SagaResponseMapper aligns with the PR objective of eliminating redundant mappers.

backend/app/domain/events/event_metadata.py (1)

8-47: Well-designed domain metadata model.

Clean dataclass implementation with proper immutable update patterns using replace(). The to_dict and from_dict methods correctly handle Environment enum serialization/deserialization.

backend/app/services/kafka_event_service.py (3)

15-17: Good separation of Avro and domain metadata concerns.

The dual import pattern clearly distinguishes between the Kafka serialization layer (AvroEventMetadata) and the domain persistence layer (DomainEventMetadata).


62-84: Clean metadata bridging between Kafka and domain layers.

The pattern of creating avro_metadata for Kafka publishing and converting to domain_metadata via from_dict(to_dict()) for storage is sound. This ensures the Kafka layer uses Pydantic/Avro-compatible models while the domain layer uses pure dataclasses.


37-37: Method signatures correctly updated to accept AvroEventMetadata.

The public API now explicitly uses AvroEventMetadata for all publish methods, making the Kafka serialization requirements clear at the interface level.

Also applies to: 147-147, 190-190

backend/app/infrastructure/mappers/event_mapper.py (3)

4-4: Import path correctly updated to domain module.

Importing EventMetadata directly from app.domain.events.event_metadata (where it's defined) should resolve the previous MyPy export error that occurred when importing from event_models.


27-27: Consistent serialization with exclude_none=True.

Using to_dict(exclude_none=True) for MongoDB documents keeps stored metadata compact by omitting null fields.


16-78: EventMapper simplified to focus on MongoDB serialization.

The removal of redundant mapper methods (per PR objectives) leaves a cleaner, focused mapper for MongoDB document conversion. The to_mongo_document and from_mongo_document methods correctly handle all event fields including metadata conversion.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (1)

55-60: Assertion on line 60 is a no-op due to or True.

The assertion assert "_deleted_at" in arch_doc or "_deletion_reason" in arch_doc or True always passes regardless of the document content, providing no test value.

If the field names vary (per the comment), consider asserting on specific expected keys or verifying the document structure more robustly:

🔎 Suggested fix
-    assert "_deleted_at" in arch_doc or "_deletion_reason" in arch_doc or True  # enum names vary
+    # Verify deleted_at is set (ArchivedEventMapper.from_event sets it)
+    assert arch.deleted_at is not None
+    assert "deleted_by" in arch_doc or "_deleted_by" in arch_doc
🧹 Nitpick comments (2)
backend/tests/unit/infrastructure/mappers/test_saga_mapper.py (1)

29-34: Consider adding test for SagaMapper.from_instance and private key filtering.

The current tests cover to_mongo/from_mongo but don't exercise:

  1. SagaMapper.from_instance() (lines 54-70 in the implementation)
  2. Private key filtering in SagaMapper.to_mongo() when context_data contains keys starting with "_"

These are minor gaps and can be addressed separately.

🔎 Example test additions
def test_saga_mapper_from_instance() -> None:
    inst = SagaInstance(saga_name="demo", execution_id="e1", context_data={"k": "v"})
    m = SagaMapper()
    saga = m.from_instance(inst)
    assert saga.saga_id == inst.saga_id
    assert saga.saga_name == inst.saga_name
    assert saga.execution_id == inst.execution_id


def test_saga_mapper_to_mongo_filters_private_keys() -> None:
    s = _saga()
    s.context_data = {"public": "ok", "_private": "hidden"}
    m = SagaMapper()
    doc = m.to_mongo(s)
    assert "public" in doc["context_data"]
    assert "_private" not in doc["context_data"]
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (1)

62-73: Consider using the actual EventExportRow model instead of a dynamic object.

The dynamic object created via type("Row", (), {})() works for duck-typing but doesn't validate that the mapper integrates correctly with the real model. If EventExportRow fields change, this test would still pass with a stale structure.

🔎 Suggested approach
-    row = type("Row", (), {})()
-    row.event_id = e.event_id
-    row.event_type = e.event_type
-    row.timestamp = e.timestamp.isoformat()
-    row.correlation_id = e.metadata.correlation_id or ""
-    row.aggregate_id = e.aggregate_id or ""
-    row.user_id = e.metadata.user_id or ""
-    row.service = e.metadata.service_name
-    row.status = e.status or ""
-    row.error = e.error or ""
+    row = EventExportRowMapper.from_event(e)
     ed = EventExportRowMapper.to_dict(row)
     assert ed["Event ID"] == e.event_id

This would test the full round-trip: event → export row → dict.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 205799f and 96e456e.

📒 Files selected for processing (15)
  • backend/app/infrastructure/mappers/__init__.py
  • backend/app/infrastructure/mappers/rate_limit_mapper.py
  • backend/app/infrastructure/mappers/replay_mapper.py
  • backend/app/infrastructure/mappers/saga_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_admin_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py
  • backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_rate_limit_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_rate_limit_mapper_extended.py
  • backend/tests/unit/infrastructure/mappers/test_replay_api_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_replay_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_replay_mapper_extended.py
  • backend/tests/unit/infrastructure/mappers/test_saga_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_saga_mapper_extended.py
  • backend/tests/unit/infrastructure/mappers/test_saved_script_mapper.py
💤 Files with no reviewable changes (5)
  • backend/app/infrastructure/mappers/rate_limit_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_rate_limit_mapper_extended.py
  • backend/app/infrastructure/mappers/init.py
  • backend/app/infrastructure/mappers/replay_mapper.py
  • backend/tests/unit/infrastructure/mappers/test_saga_mapper_extended.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/tests/unit/infrastructure/mappers/test_admin_mapper.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/tests/unit/infrastructure/mappers/test_replay_mapper.py (1)
backend/app/infrastructure/mappers/replay_mapper.py (2)
  • ReplayQueryMapper (127-149)
  • ReplaySessionMapper (18-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (10)
backend/tests/unit/infrastructure/mappers/test_rate_limit_mapper.py (1)

1-52: LGTM! Mapper removal aligns with PR objectives.

The removal of RateLimitStatus and RateLimitStatusMapper tests aligns well with the PR's goal of eliminating redundant mappers. The remaining test coverage for RateLimitRule, UserRateLimit, and RateLimitConfig mappers is comprehensive, covering roundtrip serialization, default values, date handling, and JSON serialization.

backend/app/infrastructure/mappers/saga_mapper.py (1)

1-1: Import cleanup looks good.

The simplified import aligns with the removal of SagaResponseMapper, SagaEventMapper, and related dependencies (EventMetadata, SagaCancelledEvent, SagaStatusResponse) as part of the mapper consolidation effort.

backend/tests/unit/infrastructure/mappers/test_saga_mapper.py (1)

29-34: Function rename aligns with mapper simplification.

The test now focuses solely on MongoDB round-trip (to_mongo/from_mongo) after removal of to_dict. The assertion validates the essential fields (saga_id, state) survive the round-trip.

backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py (1)

260-272: LGTM!

The test_from_event_minimal test correctly validates that a minimal event (with no error set) produces an export row with an empty error string. The test coverage for the EventExportRowMapper edge case is appropriate.

backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (2)

33-41: LGTM!

The test_event_mapper_to_from_mongo function correctly tests the round-trip conversion of events to MongoDB documents and back, including validation that extra fields are properly absorbed into the payload.


44-52: LGTM!

The test_summary_mapper function correctly validates EventSummaryMapper.from_mongo_document by constructing a summary object and verifying the mapper can reconstruct it from a dictionary representation.

backend/tests/unit/infrastructure/mappers/test_saved_script_mapper.py (1)

1-259: Cleanup is appropriate; SavedScriptMapper is actively used.

The removal of the unused DomainSavedScript import and any formatting adjustments are good code hygiene. SavedScriptMapper itself is actively used in production code (instantiated in SavedScriptRepository), so the tests and mapper should remain.

backend/tests/unit/infrastructure/mappers/test_replay_mapper.py (1)

10-10: LGTM! Clean removal of unused mapper imports.

The removal of ReplaySessionData, EventSummary, and ReplaySessionDataMapper from imports aligns well with the PR objective to eliminate redundant mappers. The remaining imports support the current test functions.

backend/tests/unit/infrastructure/mappers/test_replay_mapper_extended.py (1)

418-418: LGTM! Test logic is correct.

The assertion correctly verifies that ReplayStateMapper.from_mongo_document preserves the ReplayStatus.FAILED enum value when the document already contains an enum (rather than a string).

backend/tests/unit/infrastructure/mappers/test_replay_api_mapper.py (1)

1-391: The review comment's core premise is incorrect. ReplayApiMapper is actively used in the codebase and is not redundant:

  • backend/app/api/routes/replay.py imports and uses ReplayApiMapper directly across 8 locations
  • Methods called in routes: request_to_config(), op_to_response(), session_to_summary(), cleanup_to_response()
  • The mapper is also used internally within replay_api_mapper.py

The 0% Codecov coverage for replay_api_mapper.py does not indicate the mapper is unused or redundant—it indicates a gap in integration/route-level test coverage, not that these unit tests should be removed. The test file is appropriate and should remain since the mapper is active production code.

Note: There is a separate ReplayApiMapper class in replay_mapper.py (exported as AdminReplayApiMapper), so no naming collision exists.

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (1)

60-60: Assertion always passes due to or True.

The or True at the end makes this assertion a no-op - it will never fail regardless of the document content. If the intent is to verify that archived documents contain deletion metadata, the assertion should actually check for it.

🔎 Proposed fix
-    assert "_deleted_at" in arch_doc or "_deletion_reason" in arch_doc or True  # enum names vary
+    # Verify archived document contains expected deletion metadata fields
+    assert "deleted_at" in arch_doc or "_deleted_at" in arch_doc
+    assert "deletion_reason" in arch_doc or "_deletion_reason" in arch_doc

Alternatively, if field names are uncertain, inspect the actual keys:

# Check that at least some deletion metadata exists
deletion_keys = {"deleted_at", "_deleted_at", "deletion_reason", "_deletion_reason"}
assert any(k in arch_doc for k in deletion_keys), f"Expected deletion metadata in {arch_doc.keys()}"
🧹 Nitpick comments (2)
backend/tests/unit/events/test_schema_registry_manager.py (1)

5-5: Consider removing unused import.

The AvroEventMetadata import doesn't appear to be used in any of the test functions. If it's not needed for type checking or planned future use, consider removing it to keep imports clean.

🔎 Verify usage across the test file
#!/bin/bash
# Verify if AvroEventMetadata is used elsewhere in the test file
rg -n "AvroEventMetadata" backend/tests/unit/events/test_schema_registry_manager.py
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (1)

44-52: Consider including aggregate_id in the test document.

The test creates an EventSummary with aggregate_id but omits it from the mongo document passed to from_mongo_document. While this tests the optional field handling, you may want to also verify that aggregate_id is correctly mapped when present.

🔎 Optional enhancement
     s2 = EventSummaryMapper.from_mongo_document(
-        {"event_id": summary.event_id, "event_type": summary.event_type, "timestamp": summary.timestamp}
+        {"event_id": summary.event_id, "event_type": summary.event_type, "timestamp": summary.timestamp, "aggregate_id": summary.aggregate_id}
     )
     assert s2.event_id == summary.event_id
+    assert s2.aggregate_id == summary.aggregate_id
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 96e456e and fb5d9f4.

📒 Files selected for processing (21)
  • backend/app/events/metadata.py
  • backend/tests/helpers/events.py
  • backend/tests/integration/events/test_dlq_handler.py
  • backend/tests/integration/events/test_event_store.py
  • backend/tests/integration/events/test_event_store_consumer.py
  • backend/tests/integration/events/test_schema_registry_real.py
  • backend/tests/integration/k8s/test_k8s_worker_create_pod.py
  • backend/tests/integration/result_processor/test_result_processor.py
  • backend/tests/integration/services/events/test_event_service_integration.py
  • backend/tests/integration/test_sse_routes.py
  • backend/tests/unit/db/repositories/test_admin_events_repository.py
  • backend/tests/unit/db/repositories/test_event_repository.py
  • backend/tests/unit/dlq/test_dlq_models.py
  • backend/tests/unit/events/test_metadata_model.py
  • backend/tests/unit/events/test_metadata_model_min.py
  • backend/tests/unit/events/test_schema_registry_manager.py
  • backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py
  • backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py
  • backend/tests/unit/schemas_pydantic/test_events_schemas.py
  • backend/tests/unit/services/pod_monitor/test_event_mapper.py
  • backend/tests/unit/services/test_pod_builder.py
🧰 Additional context used
🧬 Code graph analysis (20)
backend/tests/integration/events/test_event_store.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/events/metadata.py (2)
backend/app/domain/events/event_models.py (1)
  • correlation_id (93-94)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/tests/integration/result_processor/test_result_processor.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/integration/events/test_dlq_handler.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/unit/db/repositories/test_admin_events_repository.py (2)
backend/app/events/metadata.py (2)
  • AvroEventMetadata (10-47)
  • to_dict (23-24)
backend/app/domain/events/event_models.py (2)
  • EventFields (13-45)
  • correlation_id (93-94)
backend/tests/unit/events/test_schema_registry_manager.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/integration/events/test_schema_registry_real.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/unit/dlq/test_dlq_models.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/integration/k8s/test_k8s_worker_create_pod.py (2)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/tests/integration/services/events/test_event_service_integration.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/integration/test_sse_routes.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/unit/events/test_metadata_model_min.py (2)
backend/tests/conftest.py (1)
  • app (130-138)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py (2)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/infrastructure/mappers/event_mapper.py (1)
  • EventMapper (16-78)
backend/tests/unit/schemas_pydantic/test_events_schemas.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/integration/events/test_event_store_consumer.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/unit/services/test_pod_builder.py (2)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/tests/unit/events/test_metadata_model.py (1)
backend/app/events/metadata.py (3)
  • AvroEventMetadata (10-47)
  • to_dict (23-24)
  • from_dict (27-36)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (2)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/infrastructure/mappers/event_mapper.py (3)
  • EventSummaryMapper (82-92)
  • from_mongo_document (45-78)
  • from_mongo_document (86-92)
backend/tests/helpers/events.py (1)
backend/app/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/tests/unit/db/repositories/test_event_repository.py (2)
backend/app/events/metadata.py (2)
  • AvroEventMetadata (10-47)
  • to_dict (23-24)
backend/app/domain/events/event_models.py (2)
  • correlation_id (93-94)
  • EventFields (13-45)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (24)
backend/tests/unit/events/test_schema_registry_manager.py (1)

9-37: Test logic looks good.

Both test functions are well-structured:

  • test_deserialize_json_execution_requested properly validates deserialization of execution events with all required fields
  • test_deserialize_json_missing_type_raises correctly verifies error handling for invalid input

The tests align with the PR's migration to AvroEventMetadata by ensuring the schema registry correctly handles the new metadata format.

backend/tests/unit/db/repositories/test_admin_events_repository.py (3)

10-10: LGTM: Import updated to use AvroEventMetadata.

The import change is consistent with the mapper elimination refactoring described in the PR objectives.


24-25: LGTM: Test data correctly uses AvroEventMetadata with proper serialization.

The test data construction properly:

  • Instantiates AvroEventMetadata with required fields (service_name, service_version)
  • Calls .to_dict() for serialization before MongoDB insertion
  • Includes optional fields (correlation_id, user_id) where needed for test scenarios

This pattern is appropriate for raw database inserts.

Also applies to: 40-40


45-45: LGTM: Event instantiation correctly passes AvroEventMetadata object.

The Event domain model construction properly receives the AvroEventMetadata object directly (without .to_dict()), which is the correct pattern for domain model instantiation. The omitted correlation_id will be auto-generated via the default factory defined in AvroEventMetadata.

backend/tests/unit/dlq/test_dlq_models.py (1)

22-29: LGTM! Test helper correctly migrated to AvroEventMetadata.

The _make_event helper function has been correctly updated to use AvroEventMetadata. The minimal constructor arguments (service_name and service_version) are appropriate for test fixtures, and optional fields like correlation_id will be auto-generated by the Pydantic defaults.

backend/tests/unit/services/pod_monitor/test_event_mapper.py (1)

5-5: LGTM! Import updated to AvroEventMetadata.

The import change from EventMetadata to AvroEventMetadata is correct and aligns with the broader migration to Avro-based event metadata across the codebase.

backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (2)

19-30: LGTM!

The _event() helper correctly constructs an Event with the required AvroEventMetadata fields. The correlation_id will use the default factory as expected.


33-41: LGTM!

The test properly validates the round-trip serialization and correctly verifies that extra fields in the mongo document are extracted into the payload.

backend/tests/integration/result_processor/test_result_processor.py (1)

18-18: LGTM – Metadata migration is correct.

The import and usage of AvroEventMetadata are consistent with the repository-wide migration from EventMetadata to AvroEventMetadata. The test logic remains unchanged, with only the metadata type updated.

Also applies to: 91-91

backend/tests/unit/services/test_pod_builder.py (1)

6-6: LGTM – Consistent metadata migration across all test cases.

All CreatePodCommandEvent instantiations have been updated to use AvroEventMetadata. The migration is systematic and includes both required fields (service_name, service_version) and optional fields (user_id, correlation_id) where appropriate.

Also applies to: 44-49, 158-163, 291-291, 348-353, 405-405

backend/tests/integration/test_sse_routes.py (1)

12-12: LGTM – SSE test metadata updated correctly.

The PodCreatedEvent now uses AvroEventMetadata with the required fields. This aligns with the broader metadata type migration.

Also applies to: 134-134

backend/tests/helpers/events.py (1)

5-5: LGTM – Test helper factory updated correctly.

The make_execution_requested_event helper now constructs events with AvroEventMetadata. This ensures all tests using this factory will use the correct metadata type.

Also applies to: 34-34

backend/tests/integration/events/test_schema_registry_real.py (1)

4-4: LGTM – Schema registry test validates Avro metadata serialization.

The test now verifies that AvroEventMetadata serializes and deserializes correctly through the Schema Registry, which is appropriate given the Avro-based metadata type.

Also applies to: 17-17

backend/tests/unit/schemas_pydantic/test_events_schemas.py (1)

8-8: LGTM – Pydantic schema test updated for Avro metadata.

The test validates that EventBase and EventInDB schemas work correctly with AvroEventMetadata, ensuring schema compatibility with the new metadata type.

Also applies to: 40-40

backend/tests/integration/services/events/test_event_service_integration.py (1)

9-9: LGTM – Event service test data migrated correctly.

The test seeds events with AvroEventMetadata, including appropriate fields for testing access control (user_id) and correlation (correlation_id). The metadata structure supports the test's verification of filtering and authorization logic.

Also applies to: 23-24

backend/tests/integration/events/test_dlq_handler.py (1)

5-5: LGTM – DLQ handler tests updated consistently.

Both DLQ handler test scenarios now use AvroEventMetadata for SagaStartedEvent construction, maintaining consistency with the metadata migration across the test suite.

Also applies to: 22-22, 44-44

backend/tests/integration/k8s/test_k8s_worker_create_pod.py (1)

7-7: LGTM! Consistent migration to AvroEventMetadata.

The import and usage have been correctly updated to use AvroEventMetadata instead of EventMetadata. The test logic remains unchanged and the constructor arguments are identical, ensuring no behavioral changes.

Also applies to: 66-66

backend/app/events/metadata.py (1)

10-10: Class rename improves clarity.

Renaming EventMetadata to AvroEventMetadata makes the Avro-based nature of the metadata more explicit and improves code clarity.

backend/tests/unit/db/repositories/test_event_repository.py (1)

7-7: LGTM! Test updated consistently.

All references to EventMetadata have been correctly updated to AvroEventMetadata in both the import statement and test data setup. The test logic remains unchanged.

Also applies to: 23-23, 55-55

backend/tests/unit/events/test_metadata_model_min.py (1)

1-1: LGTM! Simple and correct update.

The import and usage have been properly updated to AvroEventMetadata with no changes to test behavior.

Also applies to: 5-5

backend/tests/integration/events/test_event_store_consumer.py (1)

14-14: LGTM! Integration test correctly updated.

The test has been properly updated to use AvroEventMetadata from the Kafka infrastructure module, maintaining consistency with the migration.

Also applies to: 35-35

backend/tests/unit/events/test_metadata_model.py (1)

1-1: LGTM! Comprehensive test correctly updated.

All references to EventMetadata have been updated to AvroEventMetadata, including both constructor calls and the from_dict class method. The test continues to properly validate all helper methods.

Also applies to: 5-8

backend/tests/integration/events/test_event_store.py (1)

7-7: LGTM! All event store tests updated consistently.

All instances of EventMetadata have been correctly replaced with AvroEventMetadata across multiple test methods, maintaining consistency throughout the integration test suite.

Also applies to: 30-30, 38-38, 58-58

backend/tests/unit/infrastructure/mappers/test_event_mapper_extended.py (1)

12-12: LGTM! Mapper tests updated correctly.

The import statement and all fixture definitions have been properly updated to use AvroEventMetadata. The test logic remains unchanged and continues to provide comprehensive coverage of the mapper functionality.

Also applies to: 24-30, 59-59

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
backend/tests/unit/events/test_metadata_model.py (2)

1-8: Missing pytest marker for consistency.

Other unit test files in the project (e.g., test_saga_step_and_base.py) include pytestmark = pytest.mark.unit. Consider adding this marker here for consistency and proper test categorization.

🔎 Proposed fix
 from app.infrastructure.kafka.events.metadata import AvroEventMetadata
+import pytest
+
+pytestmark = pytest.mark.unit
 
 
 def test_to_dict() -> None:

31-36: Test only covers one branch of ensure_correlation_id.

Since AvroEventMetadata has a default_factory that always generates a correlation_id, this test only exercises the branch where correlation_id is already present. The branch that generates a new correlation_id when empty is not covered.

Additionally, line 36 is redundant since line 35 already asserts the correlation_id equals the original (which is truthy by definition).

🔎 Proposed enhancement to cover both branches
 def test_ensure_correlation_id() -> None:
     m = AvroEventMetadata(service_name="svc", service_version="1")
     # ensure_correlation_id returns self if correlation_id already present
     same = m.ensure_correlation_id()
     assert same.correlation_id == m.correlation_id
-    assert m.ensure_correlation_id().correlation_id
+    # Test the branch where correlation_id is empty (if applicable)
+    # Note: With current default_factory, this branch may be unreachable
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fb5d9f4 and c3e6aed.

📒 Files selected for processing (4)
  • backend/app/events/metadata.py
  • backend/tests/unit/events/test_metadata_model.py
  • backend/tests/unit/events/test_metadata_model_min.py
  • backend/tests/unit/services/saga/test_saga_step_and_base.py
💤 Files with no reviewable changes (2)
  • backend/app/events/metadata.py
  • backend/tests/unit/events/test_metadata_model_min.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/tests/unit/events/test_metadata_model.py (4)
backend/tests/conftest.py (1)
  • app (130-138)
backend/app/infrastructure/kafka/events/metadata.py (2)
  • AvroEventMetadata (10-47)
  • ensure_correlation_id (44-47)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/app/domain/events/event_models.py (1)
  • correlation_id (93-94)
backend/tests/unit/services/saga/test_saga_step_and_base.py (1)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (5)
backend/tests/unit/services/saga/test_saga_step_and_base.py (1)

40-40: LGTM!

The import and instantiation are correctly updated to use AvroEventMetadata, consistent with the repository-wide migration. The constructor arguments match the new class signature.

Also applies to: 49-49

backend/tests/unit/events/test_metadata_model.py (4)

4-8: LGTM!

The test correctly validates to_dict() serialization behavior.


11-15: LGTM!

The test validates from_dict() deserialization with both required and optional fields.


18-22: LGTM!

Good test coverage for with_correlation() — validates both the updated field and preservation of other fields.


25-28: LGTM!

The test validates with_user() correctly updates the user_id.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c3e6aed and 2b615bb.

📒 Files selected for processing (2)
  • backend/app/schemas_pydantic/admin_events.py
  • backend/app/schemas_pydantic/events.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/schemas_pydantic/events.py (3)
backend/app/domain/events/event_models.py (1)
  • correlation_id (93-94)
backend/app/services/coordinator/queue_manager.py (1)
  • user_id (34-35)
backend/app/dlq/models.py (1)
  • event_type (93-95)
backend/app/schemas_pydantic/admin_events.py (1)
backend/app/schemas_pydantic/events.py (1)
  • HourlyEventCountSchema (17-23)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (9)
backend/app/schemas_pydantic/events.py (5)

17-24: LGTM! Good type safety improvement.

The HourlyEventCountSchema replaces untyped Dict[str, Any] with a proper typed schema, and from_attributes=True enables direct population from domain objects without mappers.


41-41: LGTM! Consistent mapper elimination pattern.

Adding from_attributes=True and changing metadata to EventMetadataResponse enables direct conversion from domain Event objects, eliminating the need for mappers.

Also applies to: 50-50


56-57: LGTM! Consistent configuration.


118-118: LGTM! Proper API/domain layer separation.

Using EventMetadataResponse in the API layer model while the domain uses its own EventMetadata is good separation of concerns.


217-217: LGTM! Type safety improvement.

Replacing List[Dict[str, Any]] with List[HourlyEventCountSchema] improves type safety, and from_attributes=True enables direct population from aggregation results.

Also applies to: 222-222

backend/app/schemas_pydantic/admin_events.py (4)

4-4: LGTM! Necessary imports for schema improvements.

Also applies to: 7-7


76-77: LGTM! Enables direct object conversion.


101-108: LGTM! Consistent type safety pattern.

The UserEventCountSchema follows the same pattern as HourlyEventCountSchema, replacing untyped dictionaries with typed schemas.


113-114: LGTM! Comprehensive type safety improvements.

Both field type changes (events_by_hour and top_users) replace untyped dictionaries with proper schemas, and from_attributes=True enables direct population from service layer objects.

Also applies to: 117-118

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/event_service.py (1)

31-66: Post-filtering breaks total count and has_more semantics.

Lines 56-64 filter events after fetching from the repository, but total=result.total reflects the unfiltered count. When system events are excluded, the returned total no longer matches the filtered event list, misleading API consumers about pagination state. The has_more flag also becomes incorrect since it's based on the unfiltered total.

Either:

  1. Apply filtering at the repository level so total reflects filtered counts, or
  2. Recalculate total and has_more based on filtered results (though this breaks accurate pagination across pages).
🔎 Example fix (option 2, limited accuracy)
         if not include_system_events:
             filtered = [e for e in result.events if not (e.metadata and e.metadata.service_name.startswith("system-"))]
-            # Recalculate has_more based on filtered count
             return EventListResult(
                 events=filtered,
-                total=result.total,
+                total=len(filtered),  # Note: only accurate for current page
                 skip=skip,
                 limit=limit,
-                has_more=result.has_more,
+                has_more=False,  # Cannot determine without full filtered count
             )

Note: For accurate pagination with filtering, the filter should be applied in the repository query.

♻️ Duplicate comments (2)
backend/app/api/routes/user_settings.py (1)

95-103: Pagination implementation remains incomplete.

The endpoint now returns limit=limit, but still lacks a total count of all available history records and an offset/skip parameter. This deviates from the pagination pattern established in other endpoints (e.g., saga, events) where responses include total, skip, limit, and has_more.

Either implement full pagination (add skip parameter, return actual total count) or adjust the schema to clarify this is a limited snapshot without pagination metadata.

Based on the past review comment, this concern was previously raised and remains unresolved.

backend/app/services/event_service.py (1)

126-147: Same filtering issue: total count mismatch.

Similar to get_execution_events, lines 138-146 filter events by user after fetching, but return the unfiltered total count. This breaks pagination accuracy and misleads clients about the actual dataset size.

Apply the same fix as in get_execution_events—ideally by filtering at the repository level.

🧹 Nitpick comments (3)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (2)

44-52: Suggest more comprehensive field assertions.

The test only verifies event_id (line 52) but doesn't check the other fields. Additionally, the mongo_doc passed to from_mongo_document (lines 50-51) is missing the aggregate_id field, which means s2.aggregate_id will be None while summary.aggregate_id is "agg". This incomplete verification could miss regressions in the mapper.

🔎 Suggested enhancement to verify all fields
 def test_summary_mapper() -> None:
     e = _event()
     summary = EventSummary(
         event_id=e.event_id, event_type=e.event_type, timestamp=e.timestamp, aggregate_id=e.aggregate_id
     )
     s2 = EventSummaryMapper.from_mongo_document(
-        {"event_id": summary.event_id, "event_type": summary.event_type, "timestamp": summary.timestamp}
+        {
+            "event_id": summary.event_id,
+            "event_type": summary.event_type,
+            "timestamp": summary.timestamp,
+            "aggregate_id": summary.aggregate_id,
+        }
     )
     assert s2.event_id == summary.event_id
+    assert s2.event_type == summary.event_type
+    assert s2.timestamp == summary.timestamp
+    assert s2.aggregate_id == summary.aggregate_id

64-75: Consider verifying additional export fields.

The test creates a Row object with multiple fields (event_id, event_type, timestamp, correlation_id, aggregate_id, user_id, service, status, error) but only verifies the "Event ID" field in the output (line 75). This provides minimal coverage of the EventExportRowMapper.to_dict functionality.

🔎 Optional enhancement to verify more export fields
     ed = EventExportRowMapper.to_dict(row)
     assert ed["Event ID"] == e.event_id
+    assert ed["Event Type"] == e.event_type
+    assert ed["Service"] == e.metadata.service_name
+    assert ed["Status"] == e.status

This would help catch regressions in the field mappings of EventExportRowMapper.

backend/app/api/routes/user_settings.py (1)

44-56: Consider validating the domain update construction.

Lines 44-54 manually construct DomainUserSettingsUpdate by conditionally converting nested settings. While functional, this approach is verbose and duplicates conversion logic.

Consider adding a class method to DomainUserSettingsUpdate (e.g., from_api_update) that encapsulates this conversion, reducing duplication and improving maintainability.

🔎 Example refactor

Add to DomainUserSettingsUpdate in backend/app/domain/user/settings_models.py:

@classmethod
def from_api_update(cls, updates: UserSettingsUpdate) -> "DomainUserSettingsUpdate":
    return cls(
        theme=updates.theme,
        timezone=updates.timezone,
        date_format=updates.date_format,
        time_format=updates.time_format,
        notifications=(
            DomainNotificationSettings(**updates.notifications.model_dump())
            if updates.notifications
            else None
        ),
        editor=(
            DomainEditorSettings(**updates.editor.model_dump())
            if updates.editor
            else None
        ),
        custom_settings=updates.custom_settings,
    )

Then simplify the route:

-    domain_updates = DomainUserSettingsUpdate(
-        theme=updates.theme,
-        timezone=updates.timezone,
-        date_format=updates.date_format,
-        time_format=updates.time_format,
-        notifications=(
-            DomainNotificationSettings(**updates.notifications.model_dump()) if updates.notifications else None
-        ),
-        editor=DomainEditorSettings(**updates.editor.model_dump()) if updates.editor else None,
-        custom_settings=updates.custom_settings,
-    )
+    domain_updates = DomainUserSettingsUpdate.from_api_update(updates)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b615bb and a201c96.

📒 Files selected for processing (10)
  • backend/app/api/routes/events.py
  • backend/app/api/routes/saga.py
  • backend/app/api/routes/user_settings.py
  • backend/app/db/repositories/event_repository.py
  • backend/app/db/repositories/saga_repository.py
  • backend/app/schemas_pydantic/saga.py
  • backend/app/schemas_pydantic/user_settings.py
  • backend/app/services/event_service.py
  • backend/app/services/saga/saga_service.py
  • backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py
🧰 Additional context used
🧬 Code graph analysis (8)
backend/app/api/routes/saga.py (1)
backend/app/schemas_pydantic/saga.py (3)
  • SagaStatusResponse (7-39)
  • from_domain (24-39)
  • SagaListResponse (42-49)
backend/app/schemas_pydantic/saga.py (2)
frontend/src/lib/admin/pagination.svelte.ts (1)
  • skip (71-71)
backend/app/domain/events/query_builders.py (1)
  • limit (24-26)
backend/app/db/repositories/event_repository.py (3)
backend/app/api/routes/events.py (2)
  • get_events_by_correlation (133-158)
  • get_execution_events (36-64)
backend/app/services/event_service.py (2)
  • get_events_by_correlation (126-147)
  • get_execution_events (31-66)
backend/app/domain/events/event_models.py (3)
  • correlation_id (93-94)
  • EventListResult (138-145)
  • EventFields (13-45)
backend/app/services/saga/saga_service.py (3)
backend/app/api/routes/saga.py (1)
  • get_execution_sagas (53-91)
backend/app/domain/saga/models.py (1)
  • SagaListResult (53-64)
backend/app/db/repositories/saga_repository.py (1)
  • get_sagas_by_execution (49-61)
backend/app/services/event_service.py (2)
backend/app/domain/events/event_models.py (2)
  • EventListResult (138-145)
  • correlation_id (93-94)
backend/app/db/repositories/event_repository.py (2)
  • get_execution_events (169-188)
  • get_events_by_correlation (128-147)
backend/app/schemas_pydantic/user_settings.py (2)
frontend/src/lib/api/core/params.gen.ts (1)
  • Field (5-36)
backend/app/domain/events/query_builders.py (1)
  • limit (24-26)
backend/app/api/routes/user_settings.py (2)
backend/app/domain/user/settings_models.py (3)
  • DomainEditorSettings (22-28)
  • DomainNotificationSettings (13-18)
  • DomainUserSettingsUpdate (47-54)
backend/app/schemas_pydantic/user_settings.py (6)
  • EditorSettings (23-47)
  • NotificationSettings (11-20)
  • RestoreSettingsRequest (121-125)
  • SettingsHistoryEntry (98-109)
  • UserSettings (50-65)
  • SettingsHistoryResponse (112-118)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (3)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_models.py (1)
  • EventSummary (98-104)
backend/app/infrastructure/mappers/event_mapper.py (8)
  • EventSummaryMapper (82-92)
  • from_mongo_document (45-78)
  • from_mongo_document (86-92)
  • ArchivedEventMapper (95-129)
  • from_event (113-129)
  • from_event (150-161)
  • to_mongo_document (20-42)
  • to_mongo_document (99-110)
🪛 GitHub Actions: MyPy Type Checking
backend/app/db/repositories/event_repository.py

[error] 132-132: Argument 1 to "count_documents" of "AgnosticCollection" has incompatible type "dict[EventFields, str]"; expected "Mapping[str, Any]".

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (14)
backend/tests/unit/infrastructure/mappers/test_infra_event_mapper.py (2)

33-41: LGTM!

The test properly verifies the round-trip conversion and the important behavior that extra MongoDB fields are correctly moved into the event payload. The test is well-structured and covers the key mapper functionality.


60-62: No issues found. The EventFields constants are correctly defined with the expected values:

  • DELETED_AT = "_deleted_at"
  • DELETED_BY = "_deleted_by"
  • DELETION_REASON = "_deletion_reason"

The test assertions are accurate and will pass as written.

backend/app/db/repositories/saga_repository.py (1)

49-61: LGTM! Pagination implementation is correct.

The pagination logic properly computes the total count, applies skip/limit to the cursor, and returns a well-formed SagaListResult with all required metadata.

backend/app/schemas_pydantic/saga.py (1)

47-49: LGTM! Pagination fields added correctly.

The addition of skip, limit, and has_more aligns with the pagination pattern used across other list responses in this PR.

backend/app/api/routes/saga.py (2)

52-91: LGTM! Pagination and response mapping implemented correctly.

The endpoint properly accepts limit and skip parameters, calls the service with pagination, and constructs a well-formed SagaListResponse using SagaStatusResponse.from_domain for each saga.


94-128: LGTM! Consistent pagination implementation.

The list_sagas endpoint follows the same correct pattern as get_execution_sagas, with proper parameter handling and response construction.

backend/app/services/saga/saga_service.py (1)

66-77: LGTM! Service layer correctly propagates pagination.

The method signature and return type are updated appropriately, and pagination parameters are passed through to the repository as expected.

backend/app/schemas_pydantic/user_settings.py (1)

14-14: LGTM! ConfigDict correctly enables attribute-based instantiation.

Adding model_config = ConfigDict(from_attributes=True) to these models allows seamless construction from domain objects with attributes rather than dictionaries, aligning with the mapper elimination goal of this PR.

Also applies to: 26-26, 53-53, 71-71, 101-101, 115-115

backend/app/api/routes/events.py (6)

41-42: LGTM: Pagination parameters added consistently.

The pagination parameters follow good practices with appropriate validation constraints (limit: 1-1000, skip: ≥0) and align with existing patterns in other endpoints.


58-64: LGTM: Consistent pagination response structure.

The pagination response construction follows a consistent pattern across all endpoints, properly including total, limit, skip, and has_more fields. The empty result case (line 170) is also handled correctly.

Also applies to: 152-158, 183-189


53-54: LGTM: Proper access control checks.

The None checks correctly handle authorization failures by returning 403 status codes with appropriate messages, preventing subsequent access to None attributes.

Also applies to: 122-124


169-170: LGTM: Graceful handling of missing correlation ID.

Returning an empty result instead of an error when no correlation ID exists is a reasonable UX choice, and the pagination fields are properly initialized.


44-51: LGTM: Service calls updated for pagination.

The service method calls correctly pass through pagination parameters. This assumes the service layer has been updated to accept and handle these parameters appropriately.

Also applies to: 141-148, 172-179


56-56: No issues found. The service layer returns Event dataclass instances that are fully compatible with EventResponse.model_validate() via the from_attributes=True configuration. All required fields align, optional fields have proper defaults, and Pydantic v2 handles the enum coercion correctly. Error handling is not needed here—the migration is safe.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
backend/app/db/repositories/event_repository.py (2)

128-147: Fix type errors: use .value for enum members in query and sort.

The past review comment flagged line 131 (was line 132) for using EventFields.METADATA_CORRELATION_ID directly as a dictionary key, which MyPy rejects. Additionally, line 136 uses EventFields.TIMESTAMP directly in the sort call, which has the same type issue.

Use .value to extract the string value from enum members:

🔎 Proposed fix
-        query: dict[str, Any] = {EventFields.METADATA_CORRELATION_ID: correlation_id}
+        query: dict[str, Any] = {EventFields.METADATA_CORRELATION_ID.value: correlation_id}
         total_count = await self._collection.count_documents(query)
 
         cursor = (
             self._collection.find(query)
-            .sort(EventFields.TIMESTAMP, ASCENDING)
+            .sort(EventFields.TIMESTAMP.value, ASCENDING)
             .skip(skip)
             .limit(limit)
         )

169-188: Fix type errors: use .value for enum members in query and sort.

The past review comment flagged line 172 for using EventFields.PAYLOAD_EXECUTION_ID and EventFields.AGGREGATE_ID directly as dictionary keys, which MyPy rejects. Additionally, line 177 uses EventFields.TIMESTAMP directly in the sort call, which has the same type issue.

Use .value to extract the string value from enum members:

🔎 Proposed fix
-        query = {"$or": [{EventFields.PAYLOAD_EXECUTION_ID: execution_id}, {EventFields.AGGREGATE_ID: execution_id}]}
+        query = {"$or": [{EventFields.PAYLOAD_EXECUTION_ID.value: execution_id}, {EventFields.AGGREGATE_ID.value: execution_id}]}
         total_count = await self._collection.count_documents(query)
 
         cursor = (
             self._collection.find(query)
-            .sort(EventFields.TIMESTAMP, ASCENDING)
+            .sort(EventFields.TIMESTAMP.value, ASCENDING)
             .skip(skip)
             .limit(limit)
         )
🧹 Nitpick comments (1)
backend/app/services/saga/saga_orchestrator.py (1)

391-394: Consider exposing pagination parameters for scalability.

The method now retrieves sagas through a paginated repository call but discards pagination metadata (total, skip, limit, has_more). Currently, it returns at most 100 sagas per execution using default parameters.

If executions can have many sagas, consider exposing pagination parameters in the method signature to allow callers to retrieve all sagas or implement pagination at the API level.

🔎 Proposed enhancement to support pagination
-async def get_execution_sagas(self, execution_id: str) -> list[Saga]:
-    """Get all sagas for an execution, sorted by created_at descending (newest first)"""
+async def get_execution_sagas(
+    self, execution_id: str, limit: int = 100, skip: int = 0
+) -> list[Saga]:
+    """Get sagas for an execution, sorted by created_at descending (newest first)
+    
+    Args:
+        execution_id: The execution ID to filter by
+        limit: Maximum number of sagas to return (default 100)
+        skip: Number of sagas to skip for pagination (default 0)
+    
+    Returns:
+        List of Saga instances
+    """
-    result = await self._repo.get_sagas_by_execution(execution_id)
+    result = await self._repo.get_sagas_by_execution(execution_id, limit=limit, skip=skip)
     return result.sagas

Alternatively, return the full SagaListResult object to expose pagination metadata:

-async def get_execution_sagas(self, execution_id: str) -> list[Saga]:
+async def get_execution_sagas(self, execution_id: str, limit: int = 100, skip: int = 0) -> SagaListResult:
     """Get all sagas for an execution, sorted by created_at descending (newest first)"""
-    result = await self._repo.get_sagas_by_execution(execution_id)
-    return result.sagas
+    return await self._repo.get_sagas_by_execution(execution_id, limit=limit, skip=skip)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a201c96 and 535d37c.

📒 Files selected for processing (6)
  • backend/app/db/repositories/admin/admin_events_repository.py
  • backend/app/db/repositories/event_repository.py
  • backend/app/db/repositories/user_settings_repository.py
  • backend/app/services/saga/saga_orchestrator.py
  • backend/tests/integration/services/events/test_event_service_integration.py
  • backend/tests/unit/db/repositories/test_event_repository.py
✅ Files skipped from review due to trivial changes (1)
  • backend/app/db/repositories/user_settings_repository.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/tests/unit/db/repositories/test_event_repository.py
🧰 Additional context used
🧬 Code graph analysis (4)
backend/app/services/saga/saga_orchestrator.py (3)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/domain/events/event_metadata.py (1)
  • EventMetadata (9-47)
backend/app/db/repositories/saga_repository.py (1)
  • get_sagas_by_execution (49-61)
backend/app/db/repositories/admin/admin_events_repository.py (2)
backend/app/domain/events/event_models.py (1)
  • EventSummary (98-104)
backend/app/infrastructure/mappers/event_mapper.py (2)
  • from_mongo_document (45-78)
  • from_mongo_document (86-92)
backend/tests/integration/services/events/test_event_service_integration.py (2)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • AvroEventMetadata (10-47)
backend/app/db/repositories/event_repository.py (3)
  • get_execution_events (169-188)
  • query_events_advanced (402-439)
  • get_events_by_correlation (128-147)
backend/app/db/repositories/event_repository.py (2)
backend/app/services/event_service.py (2)
  • get_events_by_correlation (126-147)
  • get_execution_events (31-66)
backend/app/domain/events/event_models.py (3)
  • correlation_id (93-94)
  • EventListResult (138-145)
  • EventFields (13-45)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (5)
backend/tests/integration/services/events/test_event_service_integration.py (2)

9-9: LGTM: Import updated to use infrastructure metadata type.

The import change from EventMetadata to AvroEventMetadata aligns with the PR's goal of eliminating redundant mappers and using Pydantic-based infrastructure types directly. The AvroEventMetadata model provides the same fields used in this test.


36-36: LGTM: Result access patterns updated for EventListResult structure.

The test assertions now correctly access the .events property on EventListResult objects returned by service methods. This aligns with the unified pagination metadata structure introduced in the PR:

  • Line 36: Iterates over events_admin.events to check aggregate_id
  • Line 44: Iterates over by_corr_user.events to verify user filtering
  • Line 46: Accesses by_corr_admin.events length to validate result count

The test logic and assertions remain correct with these structural changes.

Also applies to: 44-44, 46-46

backend/app/services/saga/saga_orchestrator.py (1)

18-18: The import change is type-safe. BaseEvent explicitly requires metadata: AvroEventMetadata, which is exactly the type being provided at line 491. No incompatibility exists.

backend/app/db/repositories/admin/admin_events_repository.py (2)

379-379: Direct usage is correct.

The call now correctly uses the EventSummary instances returned by get_events_preview_for_replay without redundant conversions.


356-360: Change correctly eliminates circular conversion pattern and handles datetime serialization properly.

The refactor from List[Dict[str, Any]] to List[EventSummary] successfully removes the inefficient circular conversion flagged in the previous review. The service layer (prepare_replay_session_for_preview) properly handles the datetime field by explicitly mapping EventSummary instances to dictionaries at the API boundary (lines 106-115), which FastAPI then serializes to JSON with standard datetime handling. No further changes needed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 535d37c and a4c9fa1.

📒 Files selected for processing (1)
  • backend/app/events/core/producer.py
🧰 Additional context used
🪛 GitHub Actions: Ruff Linting
backend/app/events/core/producer.py

[error] 15-15: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 16-16: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 17-17: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 18-18: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 19-19: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 20-20: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 21-21: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 22-22: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.


[error] 24-24: E402: Module level import not at top of file. Import outside the top-of-file block detected by Ruff.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (1)
backend/app/events/core/producer.py (1)

121-123: LGTM: Correct fix for librdkafka race condition.

The lock correctly serializes Producer initialization to prevent the documented race condition in confluent-kafka-python. The context manager ensures the lock is always released, and the implementation follows best practices for this workaround.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
backend/app/db/repositories/event_repository.py (2)

129-129: Fix the type errors flagged by MyPy (duplicate).

Lines 129 and 132 use EventFields enum members directly as dictionary keys and field names, but MyPy expects strings. Use .value to extract the string value from the enum members.

🔎 Proposed fix
-        query: dict[str, Any] = {EventFields.METADATA_CORRELATION_ID: correlation_id}
+        query: dict[str, Any] = {EventFields.METADATA_CORRELATION_ID.value: correlation_id}
         total_count = await self._collection.count_documents(query)
 
-        cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).skip(skip).limit(limit)
+        cursor = self._collection.find(query).sort(EventFields.TIMESTAMP.value, ASCENDING).skip(skip).limit(limit)

Also applies to: 132-132


163-163: Fix the type errors flagged by MyPy (duplicate).

Lines 163 and 166 use EventFields enum members directly as dictionary keys and field names, but MyPy expects strings. Use .value to extract the string value from the enum members.

🔎 Proposed fix
-        query = {"$or": [{EventFields.PAYLOAD_EXECUTION_ID: execution_id}, {EventFields.AGGREGATE_ID: execution_id}]}
+        query = {"$or": [{EventFields.PAYLOAD_EXECUTION_ID.value: execution_id}, {EventFields.AGGREGATE_ID.value: execution_id}]}
         total_count = await self._collection.count_documents(query)
 
-        cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).skip(skip).limit(limit)
+        cursor = self._collection.find(query).sort(EventFields.TIMESTAMP.value, ASCENDING).skip(skip).limit(limit)

Also applies to: 166-166

🧹 Nitpick comments (2)
backend/app/infrastructure/mappers/event_mapper.py (1)

99-100: Consider calling the static method directly.

Since all EventMapper methods are static, you can simplify line 99-100 to:

doc = EventMapper.to_mongo_document(event)

This eliminates the unnecessary instantiation.

🔎 Proposed refactor
-        event_mapper = EventMapper()
-        doc = event_mapper.to_mongo_document(event)
+        doc = EventMapper.to_mongo_document(event)
backend/app/services/event_service.py (1)

31-64: Consider moving filtering logic to the repository layer.

Both get_execution_events (lines 53-54) and get_events_by_correlation (lines 134-135) perform client-side filtering after fetching events from the repository. This approach:

  1. Fetches unnecessary data from the database (system events, other users' events)
  2. Consumes more memory and bandwidth
  3. Makes pagination metadata harder to maintain correctly

Recommendation: Pass filter criteria (e.g., include_system_events, user_id) as parameters to the repository methods. This allows filtering at the database level, improving performance and simplifying pagination logic.

Also applies to: 124-143

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a4c9fa1 and 36bead9.

📒 Files selected for processing (6)
  • backend/app/db/repositories/event_repository.py
  • backend/app/events/core/producer.py
  • backend/app/infrastructure/mappers/event_mapper.py
  • backend/app/services/event_service.py
  • docs/testing/kafka-test-stability.md
  • mkdocs.yml
✅ Files skipped from review due to trivial changes (1)
  • mkdocs.yml
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/events/core/producer.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/infrastructure/mappers/event_mapper.py (3)
backend/app/domain/events/event_metadata.py (2)
  • EventMetadata (9-47)
  • to_dict (20-26)
backend/app/domain/events/event_models.py (1)
  • EventFields (13-45)
backend/app/infrastructure/kafka/events/metadata.py (1)
  • to_dict (23-24)
🪛 markdownlint-cli2 (0.18.1)
docs/testing/kafka-test-stability.md

7-7: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests
🔇 Additional comments (4)
docs/testing/kafka-test-stability.md (1)

1-86: Documentation is clear, comprehensive, and well-aligned with implementation.

The document effectively explains the Kafka test stability issue, its root cause in librdkafka, and provides both the primary solution (global lock serialization) and actionable alternatives. The structure (problem → why it happens → fix → related issues → alternatives) is pedagogically sound, and the code examples correctly illustrate the serialization pattern. The cross-references to GitHub issues and the fixture best-practices section add practical value.

backend/app/db/repositories/event_repository.py (2)

128-140: LGTM: Pagination implementation is correct.

The method correctly computes total_count before fetching documents, applies skip and limit to the cursor, and returns an EventListResult with proper pagination metadata including has_more calculation.


162-174: LGTM: Pagination implementation is correct.

The method correctly computes total_count before fetching documents, applies skip and limit to the cursor, and returns an EventListResult with proper pagination metadata including has_more calculation. The query logic appropriately checks both PAYLOAD_EXECUTION_ID and AGGREGATE_ID using $or.

backend/app/infrastructure/mappers/event_mapper.py (1)

4-4: LGTM! Improved architecture and metadata handling.

The refactoring correctly moves EventMetadata from the infrastructure layer to the domain layer, which is the proper architectural direction. The metadata serialization now uses exclude_none=True to reduce MongoDB storage overhead while the deserialization with from_dict() provides sensible defaults, ensuring compatibility with existing documents.

Also applies to: 27-27, 71-71

@sonarqubecloud
Copy link

@HardMax71 HardMax71 merged commit a44bd24 into main Dec 27, 2025
15 checks passed
@HardMax71 HardMax71 deleted the less-mappers branch December 27, 2025 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants