-
Notifications
You must be signed in to change notification settings - Fork 11
[integration] Enable async and distributed processing for the ML backend #910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…void shadowing the FlatBugDetector model
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| ANTENNA_CELERY_WORKER_NAME = "antenna_celeryworker" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making this an overridable environment variable (i.e. antenna_mlworker) to customize which workers subscribe
| if task.traceback: | ||
| # TODO: Error logs will have many tracebacks | ||
| # could add some processing to provide a concise error summary | ||
| job.logger.error(f"Subtask {task_name} ({task_id}) failed: {task.traceback}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logs the error, but then still tries to parse a successful result. Can you mark the task as failed and then continue to the next one? The status check did it's job correctly! But the subtask failed. Can you update the MLTaskRecord to say it failed?
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughIntroduces ML task tracking via a new Changes
Sequence Diagram(s)sequenceDiagram
participant User as User/<br/>API
participant JobView as JobViewSet
participant Job as Job Model
participant MLJob as MLJob
participant CeleryBeat as Celery Beat
participant Task as Celery Task
participant MLRecord as MLTaskRecord
User->>JobView: Create ML Job
JobView->>Job: Create with type=ml
Job->>MLJob: check_inprogress_subtasks()
MLJob->>MLRecord: Query in-progress subtasks
MLRecord-->>MLJob: Status for each subtask
alt Subtasks In Progress
MLJob-->>JobView: True
JobView->>Task: Schedule check_ml_job_status (on_commit)
Note over Task: Deferred until DB commit
else All Complete
MLJob-->>JobView: False
end
CeleryBeat->>Task: Periodic check_dangling_ml_jobs()
Task->>MLRecord: Find stale in-progress jobs
Task->>MLRecord: Mark as REVOKED if last_checked exceeded
Task-->>CeleryBeat: Cleanup complete
sequenceDiagram
participant StartUp as Startup
participant MLConfig as MLConfig.ready()
participant Signals as ml.signals
participant CeleryWorker as Celery Worker
participant Inspector as Celery Inspector
participant Control as Celery Control
participant Queue as Queue
StartUp->>MLConfig: Load app
MLConfig->>Signals: Import signals module
Signals->>Signals: Register signal handlers
CeleryWorker->>Signals: Emit worker_ready
Signals->>Inspector: Get active workers
Inspector-->>Signals: Worker name antenna_celeryworker@host
Signals->>Control: get_worker_name() validates pattern
Control-->>Signals: Worker found
loop For each Pipeline
Signals->>Control: add_consumer(ml-pipeline-{slug})
Control->>Queue: Subscribe worker to queue
Queue-->>Control: Subscribed
end
Signals-->>CeleryWorker: Subscription complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Areas requiring extra attention:
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docker-compose.yml (1)
101-125: Restore database readiness before running worker startup migrations.
celeryworkernow runspython manage.py migratebefore the worker boots, but itsdepends_ononly lists RabbitMQ. Without waiting for Postgres to be up, the migration can fail with anOperationalError, the container exits, anddjango(which now waits on a healthy worker) never starts. Please add an explicit Postgres dependency or a wait-for-db gate before running migrations so the worker starts reliably.Apply this diff to reintroduce the dependency:
- depends_on: - - rabbitmq + depends_on: + postgres: + condition: service_started + rabbitmq: + condition: service_started
🧹 Nitpick comments (2)
.envs/.local/.postgres (1)
6-6: Consider usingpostgresql://scheme instead ofpostgres://.The
postgres://URI scheme is deprecated. Modern PostgreSQL clients and tools (including SQLAlchemy and psycopg2) recommend usingpostgresql://instead.Apply this diff:
-DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami +DATABASE_URL=postgresql://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/amicompose/local/django/celery/worker/start (1)
8-8: Consider running migrations separately from worker startup.Running
migratein the worker startup script can cause issues if multiple workers start concurrently, even though Django has migration locking. It's better practice to run migrations as a separate initialization step before starting workers.Consider moving the migration to a separate initialization container or running it explicitly before starting workers in your orchestration layer.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (29)
.envs/.ci/.django(1 hunks).envs/.ci/.postgres(1 hunks).envs/.local/.django(1 hunks).envs/.local/.postgres(1 hunks)ami/jobs/admin.py(2 hunks)ami/jobs/migrations/0019_mltaskrecord.py(1 hunks)ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py(1 hunks)ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py(1 hunks)ami/jobs/migrations/0022_job_last_checked.py(1 hunks)ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py(1 hunks)ami/jobs/models.py(10 hunks)ami/jobs/tasks.py(2 hunks)ami/jobs/tests.py(2 hunks)ami/jobs/views.py(2 hunks)ami/main/models.py(1 hunks)ami/ml/apps.py(1 hunks)ami/ml/migrations/0026_check_dangling_ml_jobs_celery_beat.py(1 hunks)ami/ml/models/__init__.py(1 hunks)ami/ml/models/pipeline.py(7 hunks)ami/ml/schemas.py(2 hunks)ami/ml/signals.py(1 hunks)ami/ml/tasks.py(4 hunks)ami/tests/fixtures/main.py(1 hunks)compose/local/django/Dockerfile(0 hunks)compose/local/django/celery/worker/start(1 hunks)config/settings/base.py(3 hunks)config/settings/local.py(1 hunks)docker-compose.ci.yml(5 hunks)docker-compose.yml(6 hunks)
💤 Files with no reviewable changes (1)
- compose/local/django/Dockerfile
🧰 Additional context used
🧬 Code graph analysis (16)
ami/jobs/tasks.py (1)
ami/jobs/models.py (4)
Job(1057-1355)MLJob(328-786)update_status(1238-1258)save(1290-1301)
ami/ml/schemas.py (6)
ui/src/data-services/models/job.ts (1)
pipeline(109-111)ui/src/data-services/models/pipeline.ts (1)
algorithms(29-31)ui/src/data-services/models/job-details.ts (1)
errors(18-20)ui/src/data-services/models/occurrence-details.ts (1)
detections(108-110)processing_services/minimal/api/schemas.py (1)
PipelineResultsResponse(230-243)processing_services/example/api/schemas.py (1)
PipelineResultsResponse(265-279)
ami/ml/tasks.py (2)
ami/ml/models/pipeline.py (3)
process_images(187-285)process_images(1240-1252)save(1276-1282)ami/jobs/models.py (7)
Job(1057-1355)MLJob(328-786)logger(1340-1349)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-512)check_inprogress_subtasks(1190-1197)save(1290-1301)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (2)
ami/jobs/migrations/0019_mltaskrecord.py (1)
Migration(9-72)ami/jobs/models.py (3)
JobLogs(255-257)default_job_progress(210-214)JobProgress(113-207)
ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py (1)
ami/utils/schemas.py (1)
choices(63-65)
ami/jobs/views.py (2)
ami/jobs/models.py (6)
Job(1057-1355)JobState(32-68)MLJob(328-786)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-512)check_inprogress_subtasks(1190-1197)ami/ml/tasks.py (1)
check_ml_job_status(111-149)
ami/main/models.py (2)
ami/main/admin.py (1)
regroup_events(180-183)ami/tasks.py (1)
regroup_events(90-99)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
ami/jobs/migrations/0019_mltaskrecord.py (1)
Migration(9-72)
ami/jobs/migrations/0022_job_last_checked.py (2)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1)
Migration(8-28)ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
Migration(6-30)
ami/jobs/models.py (5)
ami/ml/schemas.py (3)
PipelineRequest(174-178)PipelineResultsResponse(181-231)combine_with(198-231)ami/ml/signals.py (2)
get_worker_name(15-28)subscribe_celeryworker_to_pipeline_queues(32-67)ami/ml/tasks.py (1)
check_ml_job_status(111-149)ami/jobs/views.py (1)
check_inprogress_subtasks(161-176)ami/ml/models/pipeline.py (6)
save_results(955-1063)save_results(1268-1269)save_results_async(1271-1274)save(1276-1282)process_pipeline_request(164-184)schedule_process_images(1254-1266)
ami/ml/models/__init__.py (1)
ami/ml/models/pipeline.py (2)
Pipeline(1105-1282)PipelineSaveResults(945-951)
ami/ml/models/pipeline.py (3)
ami/main/models.py (15)
name(987-988)SourceImage(1623-1871)Project(225-447)config(1394-1402)save(319-322)save(882-897)save(1119-1122)save(1462-1469)save(1561-1564)save(1829-1832)save(2110-2131)save(2347-2353)save(2526-2531)save(2814-2831)save(3451-3454)ami/ml/schemas.py (5)
PipelineRequest(174-178)PipelineResultsResponse(181-231)SourceImageRequest(116-120)PipelineRequestConfigParameters(155-171)DetectionRequest(138-142)ami/jobs/models.py (4)
logger(1340-1349)Job(1057-1355)MLTaskRecord(1005-1054)save(1290-1301)
ami/jobs/admin.py (2)
ami/jobs/models.py (3)
Job(1057-1355)MLTaskRecord(1005-1054)kill_task(1045-1054)ami/main/admin.py (2)
AdminBase(41-58)queryset(469-472)
ami/ml/signals.py (1)
ami/ml/models/pipeline.py (1)
Pipeline(1105-1282)
ami/jobs/migrations/0019_mltaskrecord.py (2)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1)
Migration(8-28)ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
Migration(6-30)
ami/jobs/tests.py (2)
ami/jobs/models.py (18)
Job(1057-1355)JobState(32-68)MLJob(328-786)MLSubtaskNames(992-994)MLSubtaskState(997-1002)save(1290-1301)logger(1340-1349)run(321-325)run(657-786)run(794-832)run(840-883)run(895-919)run(927-947)run(955-956)run(1199-1207)update_job_progress(515-654)MLTaskRecord(1005-1054)kill_task(1045-1054)ami/ml/tasks.py (2)
check_ml_job_status(111-149)check_dangling_ml_jobs(153-183)
🪛 Ruff (0.14.3)
ami/ml/schemas.py
206-206: Consider [self, *others] instead of concatenation
Replace with [self, *others]
(RUF005)
210-210: Avoid specifying long messages outside the exception class
(TRY003)
214-214: Avoid specifying long messages outside the exception class
(TRY003)
218-218: Avoid specifying long messages outside the exception class
(TRY003)
ami/ml/tasks.py
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py
9-11: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
13-28: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py
8-10: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
12-33: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/views.py
161-161: Unused method argument: request
(ARG002)
161-161: Unused method argument: pk
(ARG002)
ami/ml/apps.py
10-10: Unused noqa directive (non-enabled: F401)
Remove unused noqa directive
(RUF100)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py
7-9: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
11-30: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/migrations/0022_job_last_checked.py
7-9: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
11-17: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/models.py
437-437: Create your own exception
(TRY002)
437-437: Avoid specifying long messages outside the exception class
(TRY003)
755-755: Do not catch blind exception: Exception
(BLE001)
756-756: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1043-1043: Avoid specifying long messages outside the exception class
(TRY003)
ami/ml/models/pipeline.py
334-334: Function definition does not bind loop variable prediction_request
(B023)
335-335: Function definition does not bind loop variable task_id
(B023)
377-377: Avoid specifying long messages outside the exception class
(TRY003)
ami/jobs/admin.py
76-76: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/ml/signals.py
27-27: Do not catch blind exception: Exception
(BLE001)
32-32: Unused function argument: kwargs
(ARG001)
38-38: Use is and is not for type comparisons, or isinstance() for isinstance checks
(E721)
57-57: Avoid specifying long messages outside the exception class
(TRY003)
65-65: Redundant exception object included in logging.exception call
(TRY401)
71-71: Unused function argument: sender
(ARG001)
71-71: Unused function argument: kwargs
(ARG001)
88-88: Redundant exception object included in logging.exception call
(TRY401)
92-92: Unused function argument: sender
(ARG001)
92-92: Unused function argument: kwargs
(ARG001)
99-99: Abstract raise to an inner function
(TRY301)
99-99: Avoid specifying long messages outside the exception class
(TRY003)
104-104: Redundant exception object included in logging.exception call
(TRY401)
ami/jobs/migrations/0019_mltaskrecord.py
10-13: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
15-72: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/tests.py
326-326: Do not catch blind exception: Exception
(BLE001)
327-327: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/migrations/0026_check_dangling_ml_jobs_celery_beat.py
5-5: Unused function argument: apps
(ARG001)
5-5: Unused function argument: schema_editor
(ARG001)
21-21: Unused function argument: apps
(ARG001)
21-21: Unused function argument: schema_editor
(ARG001)
27-29: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
31-33: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (7)
ami/main/models.py (1)
888-890: Excellent improvement for transaction safety with async tasks.Wrapping the Celery task in
transaction.on_commit()ensures theregroup_eventstask is only enqueued after the database transaction successfully commits. This prevents race conditions where the task could execute before the deployment data is persisted, or execute at all if the transaction rolls back.This aligns with the broader pattern mentioned in the PR objectives of coordinating asynchronous pipeline steps with DB transactions to ensure consistency.
.envs/.ci/.django (1)
23-25: LGTM!The Celery broker URL and RabbitMQ credentials are properly configured for the CI environment.
compose/local/django/celery/worker/start (1)
7-9: LGTM!The Celery worker configuration correctly specifies the
antennaqueue and uses a descriptive worker name for discoverability.ami/jobs/tasks.py (1)
51-56: LGTM!The conditional logic correctly delegates status management to MLJob's custom implementation, which uses MLTaskRecord tracking and dedicated status update methods (
check_inprogress_subtasksandupdate_job_progress).ami/ml/apps.py (1)
9-10: LGTM!The
ready()method correctly imports signals for registration during Django app initialization, following standard Django patterns.ami/ml/models/__init__.py (1)
2-2: LGTM!The export of
PipelineSaveResultsis properly structured and aligns with the new dataclass introduced for ML pipeline result handling.Also applies to: 10-10
config/settings/local.py (1)
91-92: LGTM!Setting the default Celery queue to
antennaaligns with the worker configuration and queue-based task routing introduced in this PR.
| migrations.AlterField( | ||
| model_name="job", | ||
| name="last_checked", | ||
| field=models.DateTimeField(blank=True, default=datetime.datetime.now, null=True), | ||
| ), | ||
| migrations.AlterField( | ||
| model_name="mltaskrecord", | ||
| name="status", | ||
| field=models.CharField( | ||
| choices=[ | ||
| ("PENDING", "PENDING"), | ||
| ("STARTED", "STARTED"), | ||
| ("SUCCESS", "SUCCESS"), | ||
| ("FAIL", "FAIL"), | ||
| ("REVOKED", "REVOKED"), | ||
| ], | ||
| default="STARTED", | ||
| max_length=255, | ||
| ), | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a timezone-aware default for last_checked.
DateTimeField(default=datetime.datetime.now) returns naïve datetimes when USE_TZ=True, triggering warnings and risking incorrect conversions. Please switch to django.utils.timezone.now, which returns an aware datetime.
Suggested fix:
-import datetime
-from django.db import migrations, models
+import datetime
+from django.db import migrations, models
+from django.utils import timezone
@@
- field=models.DateTimeField(blank=True, default=datetime.datetime.now, null=True),
+ field=models.DateTimeField(blank=True, default=timezone.now, null=True),Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py
around lines 13 to 32, the migrations.AlterField for Job.last_checked uses
datetime.datetime.now which produces naive datetimes under USE_TZ=True; replace
the default with django.utils.timezone.now to return timezone-aware datetimes,
and update the import at the top of the migration file to import timezone.now
(or import from django.utils import timezone and use timezone.now) so the
migration default is timezone-aware.
| assert job.job_type_key == MLJob.key, f"{job} is not an ML job." | ||
| has_inprogress_tasks = job.check_inprogress_subtasks() | ||
| if has_inprogress_tasks: | ||
| # Schedule task to update the job status | ||
| from django.db import transaction | ||
|
|
||
| from ami.ml.tasks import check_ml_job_status | ||
|
|
||
| transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,))) | ||
| return Response({"inprogress_subtasks": has_inprogress_tasks}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix inverted rescheduling logic for ML subtasks.
Job.check_inprogress_subtasks() returns True when all subtasks have finished and False while work is still running (see the docstring in ami/jobs/models.py lines 1189-1196). Capturing that as has_inprogress_tasks flips the meaning, so we only reschedule the polling task once everything is done and we tell the client there are in-progress subtasks when there are not. Flip the condition and surface the correct boolean so the monitoring loop keeps running.
- has_inprogress_tasks = job.check_inprogress_subtasks()
- if has_inprogress_tasks:
+ subtasks_complete = job.check_inprogress_subtasks()
+ inprogress_subtasks = subtasks_complete is False
+ if inprogress_subtasks:
# Schedule task to update the job status
from django.db import transaction
from ami.ml.tasks import check_ml_job_status
transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,)))
- return Response({"inprogress_subtasks": has_inprogress_tasks})
+ return Response({"inprogress_subtasks": inprogress_subtasks})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert job.job_type_key == MLJob.key, f"{job} is not an ML job." | |
| has_inprogress_tasks = job.check_inprogress_subtasks() | |
| if has_inprogress_tasks: | |
| # Schedule task to update the job status | |
| from django.db import transaction | |
| from ami.ml.tasks import check_ml_job_status | |
| transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,))) | |
| return Response({"inprogress_subtasks": has_inprogress_tasks}) | |
| assert job.job_type_key == MLJob.key, f"{job} is not an ML job." | |
| subtasks_complete = job.check_inprogress_subtasks() | |
| inprogress_subtasks = subtasks_complete is False | |
| if inprogress_subtasks: | |
| # Schedule task to update the job status | |
| from django.db import transaction | |
| from ami.ml.tasks import check_ml_job_status | |
| transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,))) | |
| return Response({"inprogress_subtasks": inprogress_subtasks}) |
🤖 Prompt for AI Agents
In ami/jobs/views.py around lines 167-176, the code treats
Job.check_inprogress_subtasks() as "has in-progress tasks" but that method
returns True when all subtasks have finished; this inverts logic and reschedules
only when work is done. Change the variable/logic to reflect the actual meaning
(e.g., all_subtasks_finished = job.check_inprogress_subtasks()), schedule
check_ml_job_status only when NOT all_subtasks_finished (i.e., while work is
still running), and return {"inprogress_subtasks": not all_subtasks_finished} so
the client receives the correct boolean.
| lambda: process_pipeline_request.apply_async( | ||
| args=[prediction_request.dict(), project_id], | ||
| task_id=task_id, | ||
| # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? | ||
| queue=f"ml-pipeline-{pipeline}", | ||
| # all pipelines have their own queue beginning with "ml-pipeline-" | ||
| # the antenna celeryworker should subscribe to all pipeline queues | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix late-binding bug when scheduling Celery subtasks.
transaction.on_commit runs after the loop finishes, so this lambda captures prediction_request and task_id by reference. Every callback will execute with the final batch’s payload and UUID, so earlier batches never reach their queue. Bind the per-iteration values when defining the callback (e.g., via default arguments or functools.partial).
Apply this diff to lock in the values:
- transaction.on_commit(
- lambda: process_pipeline_request.apply_async(
- args=[prediction_request.dict(), project_id],
- task_id=task_id,
- # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
- queue=f"ml-pipeline-{pipeline}",
- # all pipelines have their own queue beginning with "ml-pipeline-"
- # the antenna celeryworker should subscribe to all pipeline queues
- )
- )
+ transaction.on_commit(
+ lambda prediction_request=prediction_request, task_id=task_id: process_pipeline_request.apply_async(
+ args=[prediction_request.dict(), project_id],
+ task_id=task_id,
+ # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
+ queue=f"ml-pipeline-{pipeline}",
+ # all pipelines have their own queue beginning with "ml-pipeline-"
+ # the antenna celeryworker should subscribe to all pipeline queues
+ )
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| lambda: process_pipeline_request.apply_async( | |
| args=[prediction_request.dict(), project_id], | |
| task_id=task_id, | |
| # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? | |
| queue=f"ml-pipeline-{pipeline}", | |
| # all pipelines have their own queue beginning with "ml-pipeline-" | |
| # the antenna celeryworker should subscribe to all pipeline queues | |
| ) | |
| ) | |
| lambda prediction_request=prediction_request, task_id=task_id: process_pipeline_request.apply_async( | |
| args=[prediction_request.dict(), project_id], | |
| task_id=task_id, | |
| # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? | |
| queue=f"ml-pipeline-{pipeline}", | |
| # all pipelines have their own queue beginning with "ml-pipeline-" | |
| # the antenna celeryworker should subscribe to all pipeline queues | |
| ) | |
| ) |
🧰 Tools
🪛 Ruff (0.14.3)
334-334: Function definition does not bind loop variable prediction_request
(B023)
335-335: Function definition does not bind loop variable task_id
(B023)
🤖 Prompt for AI Agents
In ami/ml/models/pipeline.py around lines 333 to 341, the lambda passed to
transaction.on_commit closes over loop variables causing a late-binding bug;
capture per-iteration values before registering the callback (e.g.,
local_prediction = prediction_request.dict(), local_task_id = task_id,
local_pipeline = pipeline) and use them as default args in the lambda or use
functools.partial so each callback calls process_pipeline_request.apply_async
with the bound args/task_id/queue (queue=f"ml-pipeline-{local_pipeline}")
instead of referencing the loop variables.
| inspector = celery_app.control.inspect() | ||
| active_workers = inspector.active() | ||
| if active_workers: # TODO: currently only works if there is one worker | ||
| # NOTE: all antenna celery workers should have ANTENNA_CELERY_WORKER_NAME | ||
| # in their name instead of the the default "celery" | ||
| return next((worker for worker in active_workers.keys() if ANTENNA_CELERY_WORKER_NAME in worker), None) | ||
| except Exception as e: | ||
| logger.warning(f"Could not find antenna celery worker name: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_worker_name fails whenever the worker is idle
inspect().active() returns {} when there are no active tasks, so we return None and downstream assertions fail. As a result, pipeline_created never subscribes the worker to new queues during normal idle periods. Please discover workers via inspect().stats()/ping() (or the sender object when available) so we always obtain the hostname.
- inspector = celery_app.control.inspect()
- active_workers = inspector.active()
- if active_workers: # TODO: currently only works if there is one worker
- # NOTE: all antenna celery workers should have ANTENNA_CELERY_WORKER_NAME
- # in their name instead of the the default "celery"
- return next((worker for worker in active_workers.keys() if ANTENNA_CELERY_WORKER_NAME in worker), None)
+ inspector = celery_app.control.inspect()
+ worker_sets = [
+ inspector.active() or {},
+ inspector.registered() or {},
+ inspector.stats() or {},
+ ]
+ for workers in worker_sets:
+ for worker in workers:
+ if ANTENNA_CELERY_WORKER_NAME in worker:
+ return worker🧰 Tools
🪛 Ruff (0.14.3)
27-27: Do not catch blind exception: Exception
(BLE001)
| if not pipelines: | ||
| # TODO: kinda hacky. is there a way to unify the django and celery logs | ||
| # to more easily see which queues the worker is subscribed to? | ||
| raise ValueError("No pipelines found; cannot subscribe to any queues.") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t raise when there are no pipelines yet
Fresh environments legitimately start with zero pipelines. Raising ValueError during worker_ready aborts the worker before it can process jobs, leaving the whole ML queue offline. Please log and return early instead of throwing.
- if not pipelines:
- # TODO: kinda hacky. is there a way to unify the django and celery logs
- # to more easily see which queues the worker is subscribed to?
- raise ValueError("No pipelines found; cannot subscribe to any queues.")
+ if not pipelines:
+ logger.info("No pipelines found; worker subscription is skipped until a pipeline is created.")
+ return True🧰 Tools
🪛 Ruff (0.14.3)
57-57: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In ami/ml/signals.py around lines 54 to 58, the current worker_ready handler
raises ValueError when pipelines is empty which crashes the worker; instead, log
an informative message and return early so the worker stays up. Replace the
raise with a call to the module/Django logger (e.g. logger.info or
logger.warning) that includes context ("no pipelines configured; worker will not
subscribe to ML queues yet") and then return from the function; do not re-raise
or exit the process.
| project = pipeline.projects.first() | ||
| assert project, f"Pipeline {pipeline} must be associated with a project." | ||
|
|
||
| results = process_images( | ||
| pipeline=pipeline, | ||
| endpoint_url=endpoint_url, | ||
| images=images, | ||
| job_id=job_id, | ||
| ) | ||
| results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the job’s project instead of pipeline.projects.first()
pipeline.projects.first() can return the wrong project (or None) when a pipeline is linked to multiple projects or was created without a ManyToMany assignment yet. That means we’ll pass the wrong project_id into process_images, so downstream service selection and config overrides run against the wrong project, breaking request routing for multi-project pipelines. Please derive the project from the ML job when job_id is present, and only fall back to an explicit argument or raise if it’s missing.
- pipeline = Pipeline.objects.get(slug=pipeline_choice)
- project = pipeline.projects.first()
- assert project, f"Pipeline {pipeline} must be associated with a project."
-
- results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk)
+ pipeline = Pipeline.objects.get(slug=pipeline_choice)
+ project_id: int | None = None
+ if job and job.project_id:
+ project_id = job.project_id
+ else:
+ project = pipeline.projects.first()
+ if project:
+ project_id = project.pk
+ assert project_id is not None, (
+ f"Cannot determine project for pipeline {pipeline}; the job must supply a project."
+ )
+
+ results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project_id)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 29 to 33, avoid using pipeline.projects.first()
because it can return the wrong project or None; instead when job_id is provided
load the ML job (e.g., MLJob.objects.get(pk=job_id)) and derive the project from
job.project (or job.project_id) and pass that project's pk into process_images;
if no job_id use an explicit project argument if provided, otherwise raise an
informative error so we never silently use pipeline.projects.first().
| <<: *django | ||
| image: ami_ci_celeryworker | ||
| command: /start-celeryworker | ||
| depends_on: | ||
| - rabbitmq | ||
| # start the worker with antenna_celeryworker to ensure it's discoverable by ami.ml.signals.get_worker_name | ||
| command: | ||
| - sh | ||
| - -c | ||
| - | | ||
| python manage.py migrate && | ||
| python -m celery -A config.celery_app worker --queues=antenna -n antenna_celeryworker@%h -l INFO | ||
| healthcheck: | ||
| # make sure DATABASE_URL is inside the ./.envs/.ci/.postgres | ||
| test: ["CMD-SHELL", "celery -A config.celery_app inspect ping -d antenna_celeryworker@$(hostname) | grep -q pong"] | ||
| interval: 10s | ||
| timeout: 50s | ||
| retries: 5 | ||
| start_period: 10s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the inherited service dependencies for the Celery worker
By overriding depends_on with a bare list we drop the Postgres/Redis/MinIO dependencies that *django provided. The worker now runs python manage.py migrate without waiting for Postgres, so CI routinely fails with connection errors. Please keep the original dependency map and add RabbitMQ alongside it.
celeryworker:
<<: *django
- depends_on:
- - rabbitmq
+ depends_on:
+ postgres:
+ condition: service_started
+ redis:
+ condition: service_started
+ minio-init:
+ condition: service_started
+ rabbitmq:
+ condition: service_started
+ ml_backend:
+ condition: service_started
+ celeryworker:
+ condition: service_healthy
# start the worker with antenna_celeryworker to ensure it's discoverable by ami.ml.signals.get_worker_nameCommittable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In docker-compose.ci.yml around lines 50 to 66, the depends_on for the Celery
worker was replaced with a bare list dropping the *django inherited dependencies
(Postgres/Redis/MinIO); restore the original dependency map by using the YAML
merge to include the *django depends_on and then append rabbitmq (e.g. use a
depends_on mapping that merges <<: *django and adds rabbitmq to the list) so the
worker waits for DB/Redis/MinIO plus RabbitMQ before running migrations.
| # TODO: Error logs will have many tracebacks | ||
| # could add some processing to provide a concise error summary | ||
| job.logger.error(f"Subtask {task_name} ({task_id}) failed: {task.traceback}") | ||
| inprogress_subtask.status = MLSubtaskState.FAIL.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this get saved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, with continue we move on to checking the next subtask. And only every 10 subtasks do we do a bulk update to the tasks.
Lines 441 to 452 in bd86042
| if len(inprogress_subtasks_to_update) >= 10: | |
| MLTaskRecord.objects.bulk_update( | |
| inprogress_subtasks_to_update, | |
| [ | |
| "status", | |
| "raw_traceback", | |
| "raw_results", | |
| "num_captures", | |
| "num_detections", | |
| "num_classifications", | |
| ], | |
| ) |
| job.save() | ||
|
|
||
| # Remove remaining tasks from the queue | ||
| for ml_task_record in job.ml_task_records.all(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have a Cancel action for the parent Job. Can you move or add this logic there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
ami/ml/tasks.py (1)
29-32: Critical: Resolve thepipeline.projects.first()issue from the previous review.Using
pipeline.projects.first()can return the wrong project (orNone) when a pipeline is linked to multiple projects, breaking request routing and config overrides. The previous review comment at lines 29-33 recommended deriving the project fromjob.projectwhenjob_idis present. This issue remains unresolved.Please apply the suggested fix from the previous review:
images = SourceImage.objects.filter(pk__in=image_ids) pipeline = Pipeline.objects.get(slug=pipeline_choice) - project = pipeline.projects.first() - assert project, f"Pipeline {pipeline} must be associated with a project." + project_id: int | None = None + if job and job.project_id: + project_id = job.project_id + else: + project = pipeline.projects.first() + if project: + project_id = project.pk + assert project_id is not None, ( + f"Cannot determine project for pipeline {pipeline}; the job must supply a project." + ) - results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk) + results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project_id)
🧹 Nitpick comments (3)
ami/jobs/tests.py (2)
326-328: Uselogging.exceptionfor better error context.When logging within an exception handler,
logging.exceptionautomatically includes the stack trace, making debugging easier.Apply this diff:
except Exception as e: - logger.error(f"Error fetching details for task {task_id}: {e}") + logger.exception(f"Error fetching details for task {task_id}: {e}") details[task_id] = {"error": str(e)}
499-499: Simplify the state check.Use a direct equality comparison instead of checking membership in a single-element list.
Apply this diff:
- self.assertIn(async_result.state, ["REVOKED"]) + self.assertEqual(async_result.state, "REVOKED")ami/ml/tasks.py (1)
158-158: Remove redundant import.
datetimeis already imported at the top of the file (line 1), so this local import is unnecessary.Apply this diff:
- import datetime - from ami.jobs.models import Job, JobState, MLJob
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ami/jobs/tests.py(2 hunks)ami/ml/tasks.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tests.py (2)
ami/jobs/models.py (18)
Job(1057-1355)JobState(32-68)MLJob(328-786)MLSubtaskNames(992-994)MLSubtaskState(997-1002)save(1290-1301)logger(1340-1349)run(321-325)run(657-786)run(794-832)run(840-883)run(895-919)run(927-947)run(955-956)run(1199-1207)update_job_progress(515-654)MLTaskRecord(1005-1054)kill_task(1045-1054)ami/ml/tasks.py (2)
check_ml_job_status(111-149)check_dangling_ml_jobs(153-188)
ami/ml/tasks.py (3)
ami/ml/models/pipeline.py (3)
process_images(187-285)process_images(1240-1252)save(1276-1282)ami/jobs/models.py (9)
Job(1057-1355)MLJob(328-786)logger(1340-1349)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-512)check_inprogress_subtasks(1190-1197)save(1290-1301)update_status(1238-1258)kill_task(1045-1054)ami/jobs/views.py (1)
check_inprogress_subtasks(161-176)
🪛 Ruff (0.14.3)
ami/jobs/tests.py
326-326: Do not catch blind exception: Exception
(BLE001)
327-327: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/tasks.py
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tests.py (1)
354-356: Verify that directly callingcheck_ml_job_statuswithin the test loop is intentional.Invoking
check_ml_job_status(job.pk)andMLJob.update_job_progress(job)synchronously within the test polling loop bypasses the normal async callback flow. While this is acceptable for integration testing to force status updates, ensure it doesn't mask timing or concurrency issues that could occur in production.
| except Job.DoesNotExist: | ||
| raise ValueError(f"Job with ID {ml_job_id} does not exist.") | ||
| except Exception as e: | ||
| error_msg = f"Error checking status for job with ID {ml_job_id}: {e}" | ||
| job.logger.error(error_msg) | ||
| job.update_status(JobState.FAILURE) | ||
| job.finished_at = datetime.datetime.now() | ||
| job.save() | ||
|
|
||
| # Remove remaining tasks from the queue | ||
| for ml_task_record in job.ml_task_records.all(): | ||
| ml_task_record.kill_task() | ||
|
|
||
| raise Exception(error_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Improve exception handling and logging.
The error handling has several issues:
- Line 137: Use exception chaining (
raise ... from e) to preserve the original context. - Line 140: Use
logging.exceptioninstead oflogging.errorto include the stack trace automatically. - Line 149: Use exception chaining when re-raising.
Apply this diff:
except Job.DoesNotExist:
- raise ValueError(f"Job with ID {ml_job_id} does not exist.")
+ raise ValueError(f"Job with ID {ml_job_id} does not exist.") from None
except Exception as e:
error_msg = f"Error checking status for job with ID {ml_job_id}: {e}"
- job.logger.error(error_msg)
+ job.logger.exception(error_msg)
job.update_status(JobState.FAILURE)
job.finished_at = datetime.datetime.now()
job.save()
# Remove remaining tasks from the queue
for ml_task_record in job.ml_task_records.all():
ml_task_record.kill_task()
- raise Exception(error_msg)
+ raise Exception(error_msg) from e🧰 Tools
🪛 Ruff (0.14.3)
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 136 to 149, the exception handling should
preserve original exception context and include stack traces: replace the
job.logger.error call with job.logger.exception(error_msg) so the stack trace is
logged, change the initial generic except Exception as e to re-raise chained
exceptions where appropriate (use "raise ValueError(... ) from e" for the
Job.DoesNotExist handling if re-raising a different exception) and when
re-raising the final Exception at the end use "raise Exception(error_msg) from
e" to maintain exception chaining; keep the job status/update/task cleanup logic
but ensure any re-raises use "from e" and use logging.exception for full stack
traces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
ami/ml/tasks.py (2)
29-30: Critical: Use job.project instead of pipeline.projects.first()This was flagged in a previous review but not addressed. When
job_idis provided (line 21 retrieves the job), you should derive the project fromjob.projectrather thanpipeline.projects.first(). The current approach can return the wrong project (or None) when a pipeline is linked to multiple projects, breaking request routing for multi-project pipelines.Apply this diff to fix:
job = None try: job = Job.objects.get(pk=job_id) job.logger.info(f"Processing {len(image_ids)} images for job {job}") except Job.DoesNotExist as e: logger.error(f"Job {job_id} not found: {e}") pass images = SourceImage.objects.filter(pk__in=image_ids) pipeline = Pipeline.objects.get(slug=pipeline_choice) - project = pipeline.projects.first() - assert project, f"Pipeline {pipeline} must be associated with a project." + + # Derive project from job when available + if job and job.project: + project = job.project + else: + project = pipeline.projects.first() + assert project, f"Pipeline {pipeline} must be associated with a project." results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk)
136-149: Improve exception handling and loggingThe exception handling has several issues that were flagged in a previous review but remain unaddressed:
- Line 137: Use exception chaining (
raise ... from None) to distinguish from errors in exception handling.- Line 140: Use
logging.exceptioninstead oflogging.errorto automatically include the stack trace.- Line 149: Use exception chaining when re-raising (
raise ... from e).Apply this diff:
except Job.DoesNotExist: - raise ValueError(f"Job with ID {ml_job_id} does not exist.") + raise ValueError(f"Job with ID {ml_job_id} does not exist.") from None except Exception as e: error_msg = f"Error checking status for job with ID {ml_job_id}: {e}" - job.logger.error(error_msg) + job.logger.exception(error_msg) job.update_status(JobState.FAILURE) job.finished_at = datetime.datetime.now() job.save() # Remove remaining tasks from the queue for ml_task_record in job.ml_task_records.all(): ml_task_record.kill_task() - raise Exception(error_msg) + raise Exception(error_msg) from e
🧹 Nitpick comments (2)
ami/jobs/models.py (2)
419-432: Use logging.exception for better error diagnosticsWhen catching the exception during
PipelineResultsResponseparsing, usejob.logger.exception(error_msg)instead ofjob.logger.error(error_msg)to automatically include the stack trace. This will aid debugging when subtasks fail due to invalid responses.Apply this diff:
try: results = PipelineResultsResponse(**results_dict) except Exception as e: error_msg = ( f"Subtask {task_name} ({task_id}) failed since it received " f"an invalid PipelineResultsResponse.\n" f"Error: {e}\n" f"Raw result: {results_dict}" ) - job.logger.error(error_msg) + job.logger.exception(error_msg) inprogress_subtask.status = MLSubtaskState.FAIL.name inprogress_subtask.raw_traceback = error_msg continue
761-811: MLJob.run orchestration is correctThe flow properly:
- Subscribes the worker to pipeline-specific queues before scheduling tasks
- Calls
schedule_process_imageswith correctjob_idandproject_id- Handles the edge case where no subtasks are scheduled (marks job SUCCESS)
- Enqueues
check_ml_job_statusviatransaction.on_commitwhen subtasks existThe exception handling at line 780-781 should use
logging.exception, but this is a minor issue since the error is logged and job status is updated to FAILURE.Consider using
job.logger.exceptioninstead ofjob.logger.errorat line 781 for better diagnostics:except Exception as e: - job.logger.error(f"Failed to submit all images: {e}") + job.logger.exception(f"Failed to submit all images: {e}") job.update_status(JobState.FAILURE)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ami/jobs/models.py(12 hunks)ami/ml/tasks.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/tasks.py (3)
ami/ml/models/pipeline.py (3)
process_images(187-285)process_images(1240-1252)save(1276-1282)ami/jobs/models.py (10)
logger(1365-1374)Job(1082-1380)JobState(32-68)MLJob(328-811)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-537)check_inprogress_subtasks(1215-1222)save(1315-1326)update_status(1263-1283)kill_task(1070-1079)ami/jobs/views.py (1)
check_inprogress_subtasks(161-176)
ami/jobs/models.py (5)
ami/ml/schemas.py (3)
PipelineRequest(174-178)PipelineResultsResponse(181-231)combine_with(198-231)ami/ml/signals.py (2)
get_worker_name(15-28)subscribe_celeryworker_to_pipeline_queues(32-67)ami/ml/tasks.py (1)
check_ml_job_status(111-149)ami/jobs/views.py (1)
check_inprogress_subtasks(161-176)ami/ml/models/pipeline.py (6)
save_results(955-1063)save_results(1268-1269)save_results_async(1271-1274)save(1276-1282)process_pipeline_request(164-184)schedule_process_images(1254-1266)
🪛 Ruff (0.14.4)
ami/ml/tasks.py
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
ami/jobs/models.py
141-141: Avoid specifying long messages outside the exception class
(TRY003)
148-148: Avoid specifying long messages outside the exception class
(TRY003)
422-422: Do not catch blind exception: Exception
(BLE001)
429-429: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
462-462: Create your own exception
(TRY002)
462-462: Avoid specifying long messages outside the exception class
(TRY003)
780-780: Do not catch blind exception: Exception
(BLE001)
781-781: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1068-1068: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (7)
ami/ml/tasks.py (1)
152-188: LGTM: Dangling job cleanup logic is soundThe implementation correctly identifies stale ML jobs (not checked in 24 hours) and properly cleans them up by revoking the status and killing remaining tasks. The use of
float("inf")for missinglast_checkedvalues ensures those jobs are caught, and the logging provides good visibility.ami/jobs/models.py (6)
1017-1028: Well-defined enums for ML subtask trackingThe
MLSubtaskNamesandMLSubtaskStateenums provide clear, type-safe tracking of ML subtask lifecycle. The choices align well with the orchestration logic incheck_inprogress_subtasks.
1030-1080: LGTM: MLTaskRecord model is well-structuredThe model effectively tracks ML subtask lifecycle with appropriate fields for task metadata, results, and error handling. The
clean()validation correctly enforces that onlysave_resultstasks can havePENDINGstatus, andkill_task()properly handles task revocation.
464-482: Excellent use of bulk updates for performanceThe pattern of bulk-updating every 10 subtasks and calling
update_job_progressperiodically is a smart optimization. It keeps the UI responsive during long-running jobs while minimizing database round-trips.
484-497: Smart consolidation of save_results tasksThe approach of combining multiple
PipelineResultsResponseobjects into a singlesave_resultstask is efficient. The use ofcombine_withensures pipeline and algorithm consistency while reducing task overhead.
539-679: Job progress tracking is well-implementedThe progress calculation correctly tracks both processing and results stages:
- Process stage: Uses completed vs. in-progress process_pipeline_request tasks
- Results stage: Uses 1:1 ratio with process tasks since each generates a save_results task
- Failure detection checks both process and save task failures
- Only counts results from successfully saved tasks
The logic properly updates job status and sets
finished_atwhen complete.
1090-1090: Job model additions support ML orchestration wellThe additions are minimal and focused:
last_checkedfield tracks status check timing for dangling job detectionml_task_recordstype hint improves developer experiencecheck_inprogress_subtasksmethod properly delegates to the job type implementationAlso applies to: 1151-1152, 1215-1222
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
ami/ml/models/pipeline.py (1)
332-341: CRITICAL: Fix late-binding bug in task scheduling.The lambda passed to
transaction.on_commitcapturesprediction_requestandtask_idby reference. Since the lambda executes after the loop completes, all callbacks will use the final iteration's values. This means only the last batch will be processed (repeatedly), and earlier batches will be ignored.Apply this diff to bind per-iteration values:
transaction.on_commit( - lambda: process_pipeline_request.apply_async( + lambda pr=prediction_request, tid=task_id: process_pipeline_request.apply_async( - args=[prediction_request.dict(), project_id], + args=[pr.dict(), project_id], - task_id=task_id, + task_id=tid, # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? queue=f"ml-pipeline-{pipeline}", # all pipelines have their own queue beginning with "ml-pipeline-" # the antenna celeryworker should subscribe to all pipeline queues ) )
🧹 Nitpick comments (5)
ami/jobs/tests.py (2)
329-331: Uselogging.exceptionto capture traceback automatically.In the exception handler within
get_ml_job_subtask_details, replacelogger.errorwithlogger.exceptionto automatically include the stack trace, which aids debugging when task details cannot be fetched.Apply this diff:
except Exception as e: - logger.error(f"Error fetching details for task {task_id}: {e}") + logger.exception(f"Error fetching details for task {task_id}: {e}") details[task_id] = {"error": str(e)}
502-502: ConsiderassertEqualif only REVOKED state is expected.Using
assertInwith a single-element list is less direct thanassertEqual. If REVOKED is the only valid state, prefer:self.assertEqual(async_result.state, "REVOKED")If multiple states are acceptable in the future, the current approach is fine.
ami/ml/schemas.py (1)
225-225: Prefer unpacking syntax for combining lists.Using
[self, *others]is more idiomatic than concatenation with[self] + others.Apply this diff:
- all_responses = [self] + others + all_responses = [self, *others]ami/ml/models/pipeline.py (2)
266-266: Clarify log message intent.The message "Found {len(detection_requests)} existing detections." appears after the conditional block and logs the same count regardless of whether reprocessing is enabled. Consider rephrasing to indicate this is the final count being sent:
task_logger.info(f"Sending {len(detection_requests)} detection(s) with the request.")
305-314: Simplify batching with list slicing.The nested loop for batching can be replaced with cleaner list slicing.
Apply this diff:
for i in range(0, len(source_image_requests), batch_size): - request_batch = [] - image_batch = [] - for j in range(batch_size): - if i + j >= len(source_image_requests): - break - request_batch.append(source_image_requests[i + j]) - image_batch.append(source_images[i + j]) + request_batch = source_image_requests[i:i + batch_size] + image_batch = source_images[i:i + batch_size] source_image_request_batches.append(request_batch) source_image_batches.append(image_batch)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
ami/jobs/tests.py(2 hunks)ami/main/models.py(1 hunks)ami/ml/models/pipeline.py(7 hunks)ami/ml/schemas.py(2 hunks)config/settings/base.py(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- ami/main/models.py
- config/settings/base.py
🧰 Additional context used
🧬 Code graph analysis (3)
ami/jobs/tests.py (3)
ami/jobs/models.py (18)
Job(1082-1380)JobState(32-68)MLJob(328-811)MLSubtaskNames(1017-1019)MLSubtaskState(1022-1027)save(1315-1326)logger(1365-1374)run(321-325)run(682-811)run(819-857)run(865-908)run(920-944)run(952-972)run(980-981)run(1224-1232)update_job_progress(540-679)MLTaskRecord(1030-1079)kill_task(1070-1079)ami/ml/models/pipeline.py (5)
Pipeline(1105-1283)save(1277-1283)process_pipeline_request(164-184)save_results(955-1063)save_results(1269-1270)ami/ml/tasks.py (2)
check_ml_job_status(111-149)check_dangling_ml_jobs(153-188)
ami/ml/models/pipeline.py (3)
ami/main/models.py (15)
name(1006-1007)SourceImage(1665-1913)Project(226-466)config(1413-1421)save(320-323)save(901-916)save(1138-1141)save(1481-1488)save(1580-1583)save(1871-1874)save(2152-2173)save(2389-2395)save(2568-2573)save(2856-2873)save(3493-3496)ami/ml/schemas.py (5)
PipelineRequest(174-197)PipelineResultsResponse(200-250)SourceImageRequest(116-120)PipelineRequestConfigParameters(155-171)DetectionRequest(138-142)ami/jobs/models.py (4)
logger(1365-1374)Job(1082-1380)MLTaskRecord(1030-1079)save(1315-1326)
ami/ml/schemas.py (3)
ui/src/data-services/models/pipeline.ts (1)
algorithms(29-31)processing_services/minimal/api/schemas.py (1)
PipelineResultsResponse(230-243)processing_services/example/api/schemas.py (1)
PipelineResultsResponse(265-279)
🪛 Ruff (0.14.4)
ami/jobs/tests.py
329-329: Do not catch blind exception: Exception
(BLE001)
330-330: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/models/pipeline.py
334-334: Function definition does not bind loop variable prediction_request
(B023)
335-335: Function definition does not bind loop variable task_id
(B023)
377-377: Avoid specifying long messages outside the exception class
(TRY003)
ami/ml/schemas.py
225-225: Consider [self, *others] instead of concatenation
Replace with [self, *others]
(RUF005)
229-229: Avoid specifying long messages outside the exception class
(TRY003)
233-233: Avoid specifying long messages outside the exception class
(TRY003)
237-237: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
| not_running = all( | ||
| task_id not in [t["id"] for w in active.values() for t in w] for w in active.values() | ||
| ) and all(task_id not in [t["id"] for w in reserved.values() for t in w] for w in reserved.values()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix redundant loop in task presence check.
The logic for verifying that task_id is not running has a redundant outer loop. The inner comprehension already iterates over all workers, so the outer for w in active.values() and for w in reserved.values() are unnecessary and may cause incorrect evaluation.
Apply this diff to simplify and correct the logic:
- not_running = all(
- task_id not in [t["id"] for w in active.values() for t in w] for w in active.values()
- ) and all(task_id not in [t["id"] for w in reserved.values() for t in w] for w in reserved.values())
+ active_task_ids = [t["id"] for tasks in active.values() for t in tasks]
+ reserved_task_ids = [t["id"] for tasks in reserved.values() for t in tasks]
+ not_running = task_id not in active_task_ids and task_id not in reserved_task_ids📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| not_running = all( | |
| task_id not in [t["id"] for w in active.values() for t in w] for w in active.values() | |
| ) and all(task_id not in [t["id"] for w in reserved.values() for t in w] for w in reserved.values()) | |
| active_task_ids = [t["id"] for tasks in active.values() for t in tasks] | |
| reserved_task_ids = [t["id"] for tasks in reserved.values() for t in tasks] | |
| not_running = task_id not in active_task_ids and task_id not in reserved_task_ids |
🤖 Prompt for AI Agents
In ami/jobs/tests.py around lines 466 to 468, the check uses an unnecessary
outer loop causing redundant iteration and potential incorrect results; replace
the double-generator form with a single membership check per collection so that
you test task_id not in [t["id"] for w in active.values() for t in w] and
task_id not in [t["id"] for w in reserved.values() for t in w] (i.e. remove the
extra "for w in ... " wrappers and evaluate each list comprehension once,
combining them with an and).
| else: | ||
| logger.warning("No project_id. No pipeline config is used. Using default empty config instead.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify warning message about default config.
The warning states "Using default empty config instead," but the method actually returns self.default_config (set on line 1159). The message should clarify that the default config is used without project-specific overrides.
Apply this diff:
else:
- logger.warning("No project_id. No pipeline config is used. Using default empty config instead.")
+ logger.warning("No project_id provided. Using pipeline default config without project-specific overrides.")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| else: | |
| logger.warning("No project_id. No pipeline config is used. Using default empty config instead.") | |
| else: | |
| logger.warning("No project_id provided. Using pipeline default config without project-specific overrides.") |
🤖 Prompt for AI Agents
In ami/ml/models/pipeline.py around lines 1170-1171, the warning "No project_id.
No pipeline config is used. Using default empty config instead." is misleading
because the method returns self.default_config (set at line 1159); update the
logger.warning to explicitly state that self.default_config will be used and
that there are no project-specific overrides (e.g., "No project_id: using
self.default_config with no project-specific overrides."). Ensure the message
mentions self.default_config and that project-specific config will not be
applied.
| Returns True if all subtasks are completed. | ||
| """ | ||
| assert job.pipeline is not None, "Job pipeline is not set" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is pretty long. Could it be refactored into smaller functions?
|
|
||
| # To avoid long running jobs from taking a long time to update, bulk update every 10 tasks | ||
| # Bulk save the updated inprogress subtasks | ||
| if len(inprogress_subtasks_to_update) >= 10: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to make this 10 a configuration parameter?
| # recreate a process_pipeline_request task | ||
| pipeline_request = SchemaField(PipelineRequest, null=True, blank=True) | ||
| # recreate a save_results task | ||
| pipeline_response = SchemaField(PipelineResultsResponse, null=True, blank=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, the response could be quite bulky, which can bloat the database. One alternative is to save these are json files outside of the DB since it's temporary information.
| self.assertEqual(job.progress.summary.progress, 0) | ||
| self.assertEqual(job.progress.summary.status, JobState.CREATED) | ||
|
|
||
| inspector = celery_app.control.inspect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carlosgjs here is an example of the python API for celery inspect, but the CLI tool might be more helpful for inspecting memory usage
Summary
Currently, we run MLJobs as a single async celery task which is not scalable to large batch image processing jobs. For example, when processing a large number of image (i.e. 100+ images), the singular long running task is easily interrupted due to a failed worker and also takes a long time to complete (since images still have to be processed in a sequential manner).
This PR uses Celery as the task queue and RabbitMQ as the message broker to send batches of images as individual
process_pipeline_requests. Each queue is dedicated to a specific ML pipeline (named after the pipeline slug). A celery worker can pick up tasks based on the pipelines they subscribe to. For example, the antenna celery workers may subscribe to all queues; an external celeryworker would subscribe to select pipelines.The
ami.ml.tasks.check_ml_job_statustask listens for completedprocess_pipeline_requestsand enqueuessave_resultstasks; thecheck_ml_job_statusreschedules itself when there are still in progress tasks.Related PR
This PR focuses on the "controller" side of the batch image processing system. This PR should be ready to deploy (pending further testing in a Staging environment) and tasks will be processed by antenna workers.
A follow up PR addresses adding additional consumers via the processing service template: #1011
List of Changes
Add a
process_pipeline_requesttask which takes aPipelineRequestand returns the model's results. The logic for this task is defined on the antenna side and can be added to external celeryworkers/processing services.Introduces a new
MLTaskRecordmodel which stores the results and stats of a celery task.Update the
Jobmodel to includeml_task_recordsto track the celery tasks currently in-progress and completed (these can be eitherprocess_pipeline_requestorsave_resultstasks)Add
ami.ml.tasks.check_ml_job_statuswhich checks the subtasks of anMLJob, updates the job status, and schedulessave_resultstasks.Add a celery beat task
ami.ml.tasks.check_dangling_ml_jobswhich revokes MLJobs that areSTARTEDbut has had not been checked bycheck_ml_job_statusrecently.All tests should now be run with the separate
docker-compose.cistack to ensure isolated testing.Related Issues
Addresses in part #515
The planning of this feature was discussed in #515. See the comments beginning at #515 (comment)
Detailed Description
Note, this PR doesn't include external celery workers, it only updates the antenna celery worker.
%%{init: {"theme": "base", "themeVariables": { "background": "#ffffff" }}}%% flowchart TD subgraph Django["CONTAINER:DJANGO"] subgraph MLJob["class:MLJob"] A["run()"] A --> B["subscribe_celeryworker_to_pipeline_queues(antenna_celeryworker)"] B --> D["Produce check_ml_job_status task"] end subgraph Pipeline["class:Pipeline"] B --> C["schedule_process_images(...)"] C --> E["process_images(process_sync=False)"] E --> F["handle_async_process_images(...)"] F --> G1["Produce process_pipeline_request task #1"] F --> G2["Produce process_pipeline_request task #2"] F --> G3["Produce process_pipeline_request task #N"] end end subgraph Celery["CONTAINER:RABBITMQ"] G1 --> H(("ml-pipeline-{pipeline} queue")) G2 --> H G3 --> H D --> I(("antenna queue")) end subgraph Antenna["CONTAINER:ANTENNA_CELERYWORKER"] H --> K{Consume process_pipeline_request task} K --> L["process_images(process_sync=True)"] subgraph Pipeline2["class:Pipeline"] L --> M["handle_sync_process_images()"] M --> M1["POST request to the ML Backend's `/process` endpoint"] end I --> N1{Consume check_ml_job_status task} N1 --> Q["check_inprogress_subtasks()"] S --> N1 subgraph MLJob2["class:MLJob"] I --> |Retrieve save_results results| Q H --> |Retrieve process_pipeline_request results| Q Q -->|process_pipeline_request OR save_results tasks in progress| S S["Produce check_ml_job_status task"] Q -->|process_pipeline_request task finished| T T["Produce save_results"] end T --> J J{"Consume save_results task"} J --> J1 subgraph Pipeline3["class:Pipeline"] J1["Save results to db"] end end subgraph ExternalWorker["CONTAINER:EXTERNAL_CELERYWORKER"] H --> O{Consume process_pipeline_request task} O --> O1[Some external logic for processing images] end %% Shapes classDef queueStyle shape:circle,fill:#f0f4c3,stroke:#333,color:black classDef workerStyle shape:diamond,fill:#ffe0b2,stroke:#333,color:black class H,I queueStyle class K,O,N1,N2,J workerStyle %% Style: black subgraph titles style Django fill:#fff,stroke:#000,stroke-width:3px,color:#000 style Celery fill:#fff,stroke:#000,stroke-width:3px,color:#000 style Antenna fill:#fff,stroke:#000,stroke-width:3px,color:#000 style ExternalWorker fill:#fff,stroke:#000,stroke-width:3px,color:#000 %% Arrows (you had BLACK before but in all caps; lowercase is valid) linkStyle default stroke:black,stroke-width:2px %% Style task produced style D fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style S fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style T fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style G1 fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style G2 fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style G3 fill:#f9f,stroke:#333,stroke-width:2px,color:#000How to Test the Changes
CI tests are moved to a separate docker compose stack.
So to run antenna, still use the main docker compose stack.
Run the new batch processing unit test:
Deployment Notes
Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)
Checklist
The
test_run_ml_jobunit test correctly passes. It also correctly fails when an Exception is manually added to the/processendpoint and/orsave_resultstask logic; the failed task details are logged in the test and the test fails since ML Task Records show failed subtasks.Summary by CodeRabbit
Release Notes
New Features
Improvements