-
Notifications
You must be signed in to change notification settings - Fork 10
Open
Description
Per https://github.com/taskiq-python/taskiq/blob/2ed2bd64cdb944c978ff132ca5afd988ed73dad3/taskiq/instrumentation.py#L79, Taskiq is supposed to inject the taskiq otel middleware to the broker.
I tried below code and unfortunately, getting errors such as:
"<LogRecord: asyncio, 40, /usr/local/lib/python3.13/asyncio/base_events.py, 1879, "Task exception was never retrieved
future: <Task finished name='schedule_ef8554ca636942c1adcc69c931eab22a' coro=<send() done, defined at /usr/local/lib/python3.13/site-packages/taskiq/cli/scheduler/run.py:156> exception=taskiq.exceptions.SendTaskError()>">"
....
from faststream import FastStream
from taskiq_faststream import AppWrapper
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware
broker = KafkaBroker(
kafka_instance=...,
consumer_group=...,
)
app = AsgiFastStream(
broker,
asgi_routes=[
("/liveness", liveness_ping),
],
)
# Enable OpenTelemetry middleware
taskiq_broker = AppWrapper(app, middlewares=[OpenTelemetryMiddleware()])Metadata
Metadata
Assignees
Labels
No labels