From cd41c4e0f3ac8f67bb389ee28a5358d18c3b99f5 Mon Sep 17 00:00:00 2001 From: Rust Saiargaliev Date: Thu, 20 Nov 2025 09:49:45 +0100 Subject: [PATCH 1/4] Drop Prometheus middleware from dramatiq settings Prometheus client became optional in dramatiq v2: https://github.com/Bogdanp/dramatiq/releases/tag/v2.0.0 I think we don't need to cover all our dependencies's subdependencies, so it is fine to drop it. --- tests/testapp/settings.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/testapp/settings.py b/tests/testapp/settings.py index 03d654f..37c2d14 100644 --- a/tests/testapp/settings.py +++ b/tests/testapp/settings.py @@ -132,7 +132,6 @@ DRAMATIQ_BROKER = { "BROKER": os.getenv("DRAMATIQ_BROKER", "dramatiq.brokers.redis.RedisBroker"), "MIDDLEWARE": [ - "dramatiq.middleware.Prometheus", "dramatiq.middleware.AgeLimit", "dramatiq.middleware.TimeLimit", "dramatiq.middleware.Callbacks", From 1c89bb29eb59b061c36040fd2c19f1d593ddb47c Mon Sep 17 00:00:00 2001 From: Rust Saiargaliev Date: Thu, 20 Nov 2025 10:05:58 +0100 Subject: [PATCH 2/4] Add fail_fast=True to maintain previous behaviour --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5d75c6d..44a6e46 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,7 +48,7 @@ def stub_worker(monkeypatch, settings, _runner): class Meta: @staticmethod def wait(): - broker.join(settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000) + broker.join(settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000, fail_fast=False) worker.join() yield Meta From 90464958d94075fab36736d6d209c637b73e3434 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 20 Nov 2025 09:06:14 +0000 Subject: [PATCH 3/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 44a6e46..264ecea 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,7 +48,9 @@ def stub_worker(monkeypatch, settings, _runner): class Meta: @staticmethod def wait(): - broker.join(settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000, fail_fast=False) + broker.join( + settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000, fail_fast=False + ) worker.join() yield Meta From f4dcc6cbfc8fc7ca84a589308b12c81646f7fc38 Mon Sep 17 00:00:00 2001 From: Johannes Maron Date: Fri, 5 Dec 2025 11:24:12 +0100 Subject: [PATCH 4/4] Use row not table locks --- joeflow/runner/dramatiq.py | 11 ++++++++--- pyproject.toml | 4 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/joeflow/runner/dramatiq.py b/joeflow/runner/dramatiq.py index 8e4be0d..20ff06d 100644 --- a/joeflow/runner/dramatiq.py +++ b/joeflow/runner/dramatiq.py @@ -32,12 +32,17 @@ class RetryError(dramatiq.errors.Retry): def _dramatiq_task_runner(task_pk, workflow_pk, retries=0): Task = apps.get_model("joeflow", "Task") with transaction.atomic(): - task = Task.objects.select_for_update().get(pk=task_pk, completed=None) + task = ( + Task.objects.filter(pk=task_pk, completed=None) + .select_for_update(nowait=True) + .get() + ) workflow = ( task.content_type.model_class() - .objects.select_for_update(nowait=True) - .get(pk=workflow_pk) + .objects.filter(pk=workflow_pk) + .select_for_update(nowait=True) + .get() ) try: diff --git a/pyproject.toml b/pyproject.toml index 0fea392..51fb3bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ test = [ docs = [ "celery>=4.2.0", "django-reversion", - "dramatiq", + "dramatiq>=2.0.0", "django_dramatiq", "redis", "graphviz>=0.18", @@ -78,7 +78,7 @@ celery = [ "celery>=4.2.0", ] dramatiq = [ - "dramatiq", + "dramatiq>=2.0.0", "django_dramatiq", ]