-
Notifications
You must be signed in to change notification settings - Fork 12
APP-9587 : Using Pubsub for message processors #898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 76.6%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report Summary
Scan Result Details✅ No vulnerabilities found during the scan for |
📦 Trivy Secret Scan Results
Report Summary
Scan Result Details✅ No secrets found during the scan for |
|
🛠 Docs available at: https://k.atlan.dev/application-sdk/add-message-processor |
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified FilesNo covered modified files...
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/898 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces Pub/Sub messaging support for message processors using Dapr, enabling applications to subscribe to message topics and process incoming messages through configurable handlers.
Key changes:
- Added Dapr pubsub component configuration with support for both in-memory and Kafka backends
- Introduced
PubSubSubscriptionandBulkSubscribemodels for configuring message subscriptions - Implemented automatic registration of message handler endpoints and Dapr subscription generation
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| components/messaging.yaml | New Dapr pubsub component configuration file with in-memory and commented Kafka configurations |
| components/eventstore.yaml | Minor formatting fix to ensure both webhook URLs remain commented out |
| application_sdk/server/fastapi/models.py | Added BulkSubscribe and PubSubSubscription models to support message subscription configuration |
| application_sdk/server/fastapi/init.py | Integrated messaging subscriptions into server initialization, router registration, and Dapr subscription endpoint |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: niteesh-atlan <niteesh.hegde@atlan.com>
firecast
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need docs on how can the handlers be written and a typedefinition defined for it as arguments so that handlers are specific
| subscription.message_handler, | ||
| methods=["POST"], | ||
| ) | ||
| self.app.include_router(messaging_router, prefix="/message-processor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets have the prefix as subscriptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to version this path? Check how events are being handled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated it. makes sense to add v1 if we standardize event-structure / cloud events format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amit-atlan - you are talking about these docs only right? I could not find these details readme in sdk.
I will add the developer docs and link sample apps here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added developer docs - https://github.com/atlanhq/atlan-docs/pull/512.
Details about dapr component - bulk and single message processors, DLQ and retries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
application_sdk/server/fastapi/init.py:127
- The docstring for the init method is incomplete. It only documents three parameters (lifespan, handler, workflow_client) but the method signature includes additional parameters: frontend_templates_path, ui_enabled, has_configmap, and the newly added messaging_subscriptions. All parameters should be documented in the Args section of the docstring for clarity and maintainability.
"""Initialize the FastAPI application.
Args:
lifespan: Optional lifespan manager for the FastAPI application.
handler: Handler for processing application operations.
workflow_client: Client for Temporal workflow operations.
"""
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| workflow_router: APIRouter | ||
| dapr_router: APIRouter | ||
| events_router: APIRouter | ||
| messaging_router: APIRouter |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly added messaging_router attribute is declared but never initialized or used. Messaging routes are registered using a local variable instead (see line 226). Consider removing this unused attribute or updating the implementation to use it consistently.
| messaging_router: APIRouter |
| self.dapr_router = APIRouter() | ||
| self.events_router = APIRouter() | ||
|
|
||
| if len(messaging_subscriptions) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for this if condition
| """ | ||
|
|
||
| subscriptions: List[dict[str, Any]] = [] | ||
| if self.messaging_subscriptions: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need here as well
| "route": f"/subscriptions/v1/{subscription.route}", | ||
| } | ||
| if subscription.bulk_subscribe and subscription.bulk_subscribe.enabled: | ||
| subscription_dict["bulkSubscribe"] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just do ** since the keys are the same
| if subscription.bulk_subscribe and subscription.bulk_subscribe.enabled: | ||
| subscription_dict["bulkSubscribe"] = { | ||
| "enabled": subscription.bulk_subscribe.enabled, | ||
| "maxMessagesCount": subscription.bulk_subscribe.maxMessagesCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
snake case vs camel case
| maxAwaitDurationMs: int = 40 | ||
|
|
||
|
|
||
| class PubSubSubscription(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets keep this Subscription
|
|
||
| model_config = {"arbitrary_types_allowed": True} | ||
|
|
||
| pubsub_component_name: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
component_name
| model_config = {"arbitrary_types_allowed": True} | ||
|
|
||
| pubsub_component_name: str | ||
| topic: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might not be required if component is always tied to a topic
| Callable[[Any], Any], Callable[[Any], Coroutine[Any, Any, Any]] | ||
| ] # Required callback function (sync or async) | ||
| bulk_subscribe: Optional[BulkSubscribe] = None | ||
| dead_letter_topic: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly this as topic
| message_handler: Union[ | ||
| Callable[[Any], Any], Callable[[Any], Coroutine[Any, Any, Any]] | ||
| ] # Required callback function (sync or async) | ||
| bulk_subscribe: Optional[BulkSubscribe] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulk_config
Changelog
APP-9587 : Using Pubsub for message processors
Scope
Sample app Pr - atlanhq/atlan-sample-apps#119
Additional context (e.g. screenshots, logs, links)
Checklist
Copyleft License Compliance