diff --git a/queue_job/README.rst b/queue_job/README.rst index 70a6f226a5..5c05ce9440 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -425,6 +425,7 @@ a job, is transferred to the job according to an allow-list. The default allow-list is `("tz", "lang", "allowed_company_ids", "force_company", "active_test")`. It can be customized in ``Base._job_prepare_context_before_enqueue_keys``. + **Bypass jobs on running Odoo** When you are developing (ie: connector modules) you might want diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index d1e56e8f77..9ba18e8d29 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -74,7 +74,13 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route("/queue_job/runjob", type="http", auth="none", save_session=False) + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) def runjob(self, db, job_uuid, **kw): http.request.session.db = db env = http.request.env(user=SUPERUSER_ID) diff --git a/queue_job/job.py b/queue_job/job.py index d8e50e5124..790e07d90e 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -9,7 +9,6 @@ import uuid import weakref from datetime import datetime, timedelta -from functools import total_ordering from random import randint import odoo @@ -104,7 +103,6 @@ def identity_exact_hasher(job_): return hasher -@total_ordering class Job: """A Job is a task to execute. It is the in-memory representation of a job. @@ -367,65 +365,6 @@ def job_record_with_same_identity_key(self): ) return existing - # TODO to deprecate (not called anymore) - @classmethod - def enqueue( - cls, - func, - args=None, - kwargs=None, - priority=None, - eta=None, - max_retries=None, - description=None, - channel=None, - identity_key=None, - ): - """Create a Job and enqueue it in the queue. Return the job uuid. - - This expects the arguments specific to the job to be already extracted - from the ones to pass to the job function. - - If the identity key is the same than the one in a pending job, - no job is created and the existing job is returned - - """ - new_job = cls( - func=func, - args=args, - kwargs=kwargs, - priority=priority, - eta=eta, - max_retries=max_retries, - description=description, - channel=channel, - identity_key=identity_key, - ) - return new_job._enqueue_job() - - # TODO to deprecate (not called anymore) - def _enqueue_job(self): - if self.identity_key: - existing = self.job_record_with_same_identity_key() - if existing: - _logger.debug( - "a job has not been enqueued due to having " - "the same identity key (%s) than job %s", - self.identity_key, - existing.uuid, - ) - return Job._load_from_db_record(existing) - self.store() - _logger.debug( - "enqueued %s:%s(*%r, **%r) with uuid: %s", - self.recordset, - self.method_name, - self.args, - self.kwargs, - self.uuid, - ) - return self - @staticmethod def db_record_from_uuid(env, job_uuid): # TODO remove in 15.0 or 16.0 @@ -749,16 +688,6 @@ def __eq__(self, other): def __hash__(self): return self.uuid.__hash__() - def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq - - def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() - def db_record(self): return self.db_records_from_uuids(self.env, [self.uuid]) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 468fb5760d..7fcabbd07b 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -2,6 +2,7 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) import logging +from collections import namedtuple from functools import total_ordering from heapq import heappop, heappush from weakref import WeakValueDictionary @@ -10,6 +11,7 @@ from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) +JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq") _logger = logging.getLogger(__name__) @@ -108,7 +110,7 @@ class ChannelJob: job that are necessary to prioritise them. Channel jobs are comparable according to the following rules: - * jobs with an eta come before all other jobs + * jobs with an eta cannot be compared with jobs without * then jobs with a smaller eta come first * then jobs with a smaller priority come first * then jobs with a smaller creation time come first @@ -135,14 +137,18 @@ class ChannelJob: >>> j3 < j1 True - j4 and j5 comes even before j3, because they have an eta + j4 and j5 have an eta, they cannot be compared with j3 >>> j4 = ChannelJob(None, None, 4, ... seq=0, date_created=4, priority=9, eta=9) >>> j5 = ChannelJob(None, None, 5, ... seq=0, date_created=5, priority=9, eta=9) - >>> j4 < j5 < j3 + >>> j4 < j5 True + >>> j4 < j3 + Traceback (most recent call last): + ... + TypeError: '<' not supported between instances of 'int' and 'NoneType' j6 has same date_created and priority as j5 but a smaller eta @@ -153,7 +159,7 @@ class ChannelJob: Here is the complete suite: - >>> j6 < j4 < j5 < j3 < j1 < j2 + >>> j6 < j4 < j5 and j3 < j1 < j2 True j0 has the same properties as j1 but they are not considered @@ -173,14 +179,13 @@ class ChannelJob: """ + __slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__") + def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name self.channel = channel self.uuid = uuid - self.seq = seq - self.date_created = date_created - self.priority = priority - self.eta = eta + self._sorting_key = JobSortingKey(eta, priority, date_created, seq) def __repr__(self): return "" % self.uuid @@ -191,18 +196,36 @@ def __eq__(self, other): def __hash__(self): return id(self) + def set_no_eta(self): + self._sorting_key = JobSortingKey(None, *self._sorting_key[1:]) + + @property + def seq(self): + return self._sorting_key.seq + + @property + def date_created(self): + return self._sorting_key.date_created + + @property + def priority(self): + return self._sorting_key.priority + + @property + def eta(self): + return self._sorting_key.eta + def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq + # DEPRECATED + return self._sorting_key def sorting_key_ignoring_eta(self): - return self.priority, self.date_created, self.seq + return self._sorting_key[1:] def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() + # Do not compare job where ETA is set with job where it is not + # If one job 'eta' is set, and the other is None, it raises TypeError + return self._sorting_key < other._sorting_key class ChannelQueue: @@ -312,7 +335,7 @@ def remove(self, job): def pop(self, now): while self._eta_queue and self._eta_queue[0].eta <= now: eta_job = self._eta_queue.pop() - eta_job.eta = None + eta_job.set_no_eta() self._queue.add(eta_job) if self.sequential and self._eta_queue and self._queue: eta_job = self._eta_queue[0] diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 7f935c63d7..8738f6b10a 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -431,6 +431,17 @@ def __init__( self._stop = False self._stop_pipe = os.pipe() + def __del__(self): + # pylint: disable=except-pass + try: + os.close(self._stop_pipe[0]) + except OSError: + pass + try: + os.close(self._stop_pipe[1]) + except OSError: + pass + @classmethod def from_environ_or_config(cls): scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get( diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 075c8f0501..df33e2c7c5 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.tools import config, html_escape +from odoo.tools import config, html_escape, index_exists from odoo.addons.base_sparse_field.models.fields import Serialized @@ -91,7 +91,7 @@ class QueueJob(models.Model): func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) - priority = fields.Integer() + priority = fields.Integer(group_operator=False) exc_name = fields.Char(string="Exception", readonly=True) exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True) exc_info = fields.Text(string="Exception Info", readonly=True) @@ -130,16 +130,21 @@ class QueueJob(models.Model): worker_pid = fields.Integer(readonly=True) def init(self): - self._cr.execute( - "SELECT indexname FROM pg_indexes WHERE indexname = %s ", - ("queue_job_identity_key_state_partial_index",), - ) - if not self._cr.fetchone(): + index_1 = "queue_job_identity_key_state_partial_index" + index_2 = "queue_job_channel_date_done_date_created_index" + if not index_exists(self._cr, index_1): + # Used by Job.job_record_with_same_identity_key self._cr.execute( "CREATE INDEX queue_job_identity_key_state_partial_index " "ON queue_job (identity_key) WHERE state in ('pending', " "'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;" ) + if not index_exists(self._cr, index_2): + # Used by .autovacuum + self._cr.execute( + "CREATE INDEX queue_job_channel_date_done_date_created_index " + "ON queue_job (channel, date_done, date_created);" + ) @api.depends("records") def _compute_record_ids(self): @@ -408,6 +413,7 @@ def autovacuum(self): ("date_cancelled", "<=", deadline), ("channel", "=", channel.complete_name), ], + order="date_done, date_created", limit=1000, ) if jobs: diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index 13f1f5f832..51e4fec832 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -256,6 +256,7 @@ def _add_job(self, *args, **kwargs): if not job.identity_key or all( j.identity_key != job.identity_key for j in self.enqueued_jobs ): + self._prepare_context(job) self.enqueued_jobs.append(job) patcher = mock.patch.object(job, "store") @@ -274,6 +275,13 @@ def _add_job(self, *args, **kwargs): ) return job + def _prepare_context(self, job): + # pylint: disable=context-overridden + job_model = job.job_model.with_context({}) + field_records = job_model._fields["records"] + # Filter the context to simulate store/load of the job + job.recordset = field_records.convert_to_write(job.recordset, job_model) + def __enter__(self): return self diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index c6486e27ef..131ce6322d 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -3,8 +3,57 @@ # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API +import os + +from odoo.tests import BaseCase, tagged + from odoo.addons.queue_job.jobrunner import runner from .common import load_doctests load_tests = load_doctests(runner) + + +@tagged("-at_install", "post_install") +class TestRunner(BaseCase): + @classmethod + def _is_open_file_descriptor(cls, fd): + try: + os.fstat(fd) + return True + except OSError: + return False + + def test_runner_file_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + self.assertTrue(self._is_open_file_descriptor(read_fd)) + self.assertTrue(self._is_open_file_descriptor(write_fd)) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) + + def test_runner_file_closed_read_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + os.close(read_fd) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) + + def test_runner_file_closed_write_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + os.close(write_fd) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 1dc77c5e0b..7f374d8ba8 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -157,7 +157,7 @@ decoration-muted="state == 'done'" > - + - - - - - - + + + + + + + @@ -212,6 +213,7 @@ + @@ -253,6 +255,12 @@ domain="[('state', '=', 'cancelled')]" /> + + +