From 49b6ee382c10cbc12ccfbee13da554f857ac271b Mon Sep 17 00:00:00 2001 From: weetster <98231586+weetster@users.noreply.github.com> Date: Wed, 27 Apr 2022 14:13:40 -0400 Subject: [PATCH] Added support for requests that contain an ETA in the future. (#59) Allow Batches tasks to be called with `apply_async(..., eta=...)` or `apply_async(..., countdown=...)`. --- CHANGELOG.rst | 3 ++ celery_batches/__init__.py | 61 +++++++++++++++++++++++++++++------ docs/index.rst | 30 +++++++++++++++++ t/integration/test_batches.py | 59 ++++++++++++++++++++++++++++++++- 4 files changed, 142 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 46acfbe..cca3c70 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -13,6 +13,9 @@ Improvements feature for ``Batches`` tasks. (`#39 `_) * Support |using a custom Request class|_ for ``Batches`` tasks. (`#63 `_) +* Support calling tasks with an ``eta`` or ``countdown`` specified. Contributed by + `@weetster `_. + (`#59 `_) Bugfixes -------- diff --git a/celery_batches/__init__.py b/celery_batches/__init__.py index b0f4ab5..dde59ca 100644 --- a/celery_batches/__init__.py +++ b/celery_batches/__init__.py @@ -1,5 +1,6 @@ from itertools import count, filterfalse, tee from queue import Empty, Queue +from time import monotonic from typing import ( Any, Callable, @@ -22,10 +23,11 @@ from celery.utils.imports import symbol_by_name from celery.utils.log import get_logger from celery.utils.nodenames import gethostname +from celery.utils.time import timezone from celery.worker.consumer import Consumer from celery.worker.request import Request, create_request_cls from celery.worker.strategy import hybrid_to_proto2, proto1_to_proto2 -from kombu.asynchronous.timer import Timer +from kombu.asynchronous.timer import Timer, to_timestamp from kombu.message import Message from kombu.utils.uuid import uuid from vine import promise @@ -171,6 +173,7 @@ class Batches(Task): def __init__(self) -> None: self._buffer: Queue[Request] = Queue() + self._pending: Queue[Request] = Queue() self._count = count(1) self._tref: Optional[Timer] = None self._pool: BasePool = None @@ -183,6 +186,8 @@ def Strategy(self, task: "Batches", app: Celery, consumer: Consumer) -> Callable # # This adds to a buffer at the end, instead of executing the task as # the default strategy does. + # + # See Batches._do_flush for ETA handling. self._pool = consumer.pool hostname = consumer.hostname @@ -285,15 +290,51 @@ def apply( return super().apply(([request],), {}, *_args, **options) def _do_flush(self) -> None: - logger.debug("Batches: Wake-up to flush buffer...") - requests = None - if self._buffer.qsize(): - requests = list(consume_queue(self._buffer)) - if requests: - logger.debug("Batches: Buffer complete: %s", len(requests)) - self.flush(requests) - if not requests: - logger.debug("Batches: Canceling timer: Nothing in buffer.") + logger.debug("Batches: Wake-up to flush buffers...") + + ready_requests = [] + app = self.app + to_system_tz = timezone.to_system + now = monotonic() + + all_requests = list(consume_queue(self._buffer)) + list( + consume_queue(self._pending) + ) + for req in all_requests: + # Similar to logic in celery.worker.strategy.default. + if req.eta: + try: + if req.utc: + eta = to_timestamp(to_system_tz(req.eta)) + else: + eta = to_timestamp(req.eta, app.timezone) + except (OverflowError, ValueError) as exc: + logger.error( + "Couldn't convert ETA %r to timestamp: %r. Task: %r", + req.eta, + exc, + req.info(safe=True), + exc_info=True, + ) + req.reject(requeue=False) + continue + + if eta <= now: + # ETA has elapsed, request is ready. + ready_requests.append(req) + else: + # ETA has not elapsed, add to pending queue. + self._pending.put(req) + else: + # Request does not have an ETA, ready immediately + ready_requests.append(req) + + if len(ready_requests) > 0: + logger.debug("Batches: Ready buffer complete: %s", len(ready_requests)) + self.flush(ready_requests) + + if not ready_requests and self._pending.qsize() == 0: + logger.debug("Batches: Canceling timer: Nothing in buffers.") if self._tref: self._tref.cancel() # cancel timer. self._tref = None diff --git a/docs/index.rst b/docs/index.rst index ee322dd..df4705e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -44,6 +44,36 @@ used to provide values to signals and does not populate into the results backend from celery import current_app current_app.backend.mark_as_done(request.id, response, request=request) +Retrying tasks +############## + +In order to retry a failed task, the task must be re-executed with the original +``task_id``, see the example below: + +.. code-block:: python + + @app.task(base=Batches, flush_every=100, flush_interval=10) + def flaky_task(requests): + for request in requests: + # Do something that might fail. + try: + response = might_fail(*request.args, **request.kwargs) + except TemporaryError: + # Retry the task 10 seconds from now with the same arguments and task_id. + flaky_task.apply_async( + args=request.args, + kwargs=request.kwargs, + countdown=10, + task_id=request.id, + ) + else: + app.backend.mark_as_done(request.id, response, request=request) + +Note that the retried task is still bound by the flush rules of the ``Batches`` +task, it is used as a lower-bound and will not run *before* that timeout. In the +example above it will run between 10 - 20 seconds from now, assuming no other +tasks are in the queue. + .. toctree:: :hidden: diff --git a/t/integration/test_batches.py b/t/integration/test_batches.py index 35236b4..8169179 100644 --- a/t/integration/test_batches.py +++ b/t/integration/test_batches.py @@ -1,9 +1,10 @@ +from datetime import datetime, timedelta from time import sleep from typing import Any, Callable, List, Optional, Union from celery_batches import Batches, SimpleRequest -from celery import Celery, signals +from celery import Celery, signals, states from celery.app.task import Task from celery.contrib.testing.tasks import ping from celery.contrib.testing.worker import TestWorkController @@ -270,3 +271,59 @@ def acks(requests: List[SimpleRequest]) -> None: # After the tasks are done, both results are acked. assert acked == [result_1.id, result_2.id] + + +def test_countdown(celery_app: Celery, celery_worker: TestWorkController) -> None: + """Ensure that countdowns work properly. + + The batch task handles only the first request initially (as the second request + is not ready). A subsequent call handles the second request. + """ + + if not celery_app.conf.broker_url.startswith("memory"): + raise pytest.skip("Flaky on live brokers") + + result_1 = add.apply_async(args=(1,)) + # The countdown is longer than the flush interval + first sleep, but shorter + # than the flush interval + first sleep + second sleep. + result_2 = add.apply_async(args=(2,), countdown=3) + + # The flush interval is 0.1 seconds and the retry interval is 0.5 seconds, + # this is longer. + sleep(1) + + # Let the worker work. + _wait_for_ping() + + assert result_1.get() == 1 + assert result_2.state == states.PENDING + + sleep(3) + + assert result_2.get() == 2 + + +def test_eta(celery_app: Celery, celery_worker: TestWorkController) -> None: + """Ensure that ETAs work properly.""" + + if not celery_app.conf.broker_url.startswith("memory"): + raise pytest.skip("Flaky on live brokers") + + result_1 = add.apply_async(args=(1,)) + # The countdown is longer than the flush interval + first sleep, but shorter + # than the flush interval + first sleep + second sleep. + result_2 = add.apply_async(args=(2,), eta=datetime.utcnow() + timedelta(seconds=3)) + + # The flush interval is 0.1 seconds and the retry interval is 0.5 seconds, + # this is longer. + sleep(1) + + # Let the worker work. + _wait_for_ping() + + assert result_1.get() == 1 + assert result_2.state == states.PENDING + + sleep(3) + + assert result_2.get() == 2