diff --git a/joeflow/runner/dramatiq.py b/joeflow/runner/dramatiq.py index 8e4be0d..20ff06d 100644 --- a/joeflow/runner/dramatiq.py +++ b/joeflow/runner/dramatiq.py @@ -32,12 +32,17 @@ class RetryError(dramatiq.errors.Retry): def _dramatiq_task_runner(task_pk, workflow_pk, retries=0): Task = apps.get_model("joeflow", "Task") with transaction.atomic(): - task = Task.objects.select_for_update().get(pk=task_pk, completed=None) + task = ( + Task.objects.filter(pk=task_pk, completed=None) + .select_for_update(nowait=True) + .get() + ) workflow = ( task.content_type.model_class() - .objects.select_for_update(nowait=True) - .get(pk=workflow_pk) + .objects.filter(pk=workflow_pk) + .select_for_update(nowait=True) + .get() ) try: diff --git a/pyproject.toml b/pyproject.toml index 0fea392..51fb3bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ test = [ docs = [ "celery>=4.2.0", "django-reversion", - "dramatiq", + "dramatiq>=2.0.0", "django_dramatiq", "redis", "graphviz>=0.18", @@ -78,7 +78,7 @@ celery = [ "celery>=4.2.0", ] dramatiq = [ - "dramatiq", + "dramatiq>=2.0.0", "django_dramatiq", ] diff --git a/tests/conftest.py b/tests/conftest.py index 5d75c6d..264ecea 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,7 +48,9 @@ def stub_worker(monkeypatch, settings, _runner): class Meta: @staticmethod def wait(): - broker.join(settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000) + broker.join( + settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000, fail_fast=False + ) worker.join() yield Meta diff --git a/tests/testapp/settings.py b/tests/testapp/settings.py index 03d654f..37c2d14 100644 --- a/tests/testapp/settings.py +++ b/tests/testapp/settings.py @@ -132,7 +132,6 @@ DRAMATIQ_BROKER = { "BROKER": os.getenv("DRAMATIQ_BROKER", "dramatiq.brokers.redis.RedisBroker"), "MIDDLEWARE": [ - "dramatiq.middleware.Prometheus", "dramatiq.middleware.AgeLimit", "dramatiq.middleware.TimeLimit", "dramatiq.middleware.Callbacks",