feat(celery): finalize in-flight task on worker eviction so the chord proceeds#1203
feat(celery): finalize in-flight task on worker eviction so the chord proceeds#1203ocervell wants to merge 7 commits into
Conversation
… proceeds On a K8s pod eviction (SIGTERM -> grace -> SIGKILL) the worker running a task dies. With task_acks_late on the Redis broker, its message is only redelivered after the broker visibility timeout (hours, on the long-task pool) — stalling the surrounding chord/workflow that whole time. Unlike a child OOM (where the master survives and task_reject_on_worker_lost requeues immediately), a whole-pod eviction has no surviving master, so Redis can only fall back to the visibility timeout. Catch the worker_shutting_down signal and raise a flag the running task's monitor thread polls. On shutdown it stops the task early via the existing stop_process path, so the task returns its partial results through the normal completion path and the chord proceeds in seconds instead of waiting for the visibility timeout. - celery_signals.py: SHUTDOWN_FLAG + worker_shutting_down_handler (wired unconditionally); stale flag cleared on worker boot. - command.py: the monitor thread checks the flag (same self-stop used for timeout/memory limits) and caps its poll interval (MONITOR_POLL_SECONDS=5) so an eviction is caught well within the pod's terminationGracePeriodSeconds. - tests: flag lifecycle + a behavioral test (a running command stops early and emits the eviction warning). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01P5vSjfkBuGAAHdKxHS3ySm
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds a file-based worker shutdown flag ( ChangesWorker Eviction Self-Finalize
Sequence Diagram(s)sequenceDiagram
participant Celery as Celery Worker
participant SignalHandler as worker_shutting_down_handler
participant FlagFile as SHUTDOWN_FLAG (filesystem)
participant Monitor as _monitor_process loop
participant Subprocess as Running subprocess
Celery->>SignalHandler: signals.worker_shutting_down fires
SignalHandler->>FlagFile: touch(SHUTDOWN_FLAG)
loop every min(stat_update_frequency, 5s)
Monitor->>FlagFile: is_worker_shutting_down()
FlagFile-->>Monitor: True
Monitor->>Monitor: enqueue Warning("worker shutting down")
Monitor->>Subprocess: SIGTERM via stop_process(exit_ok=True)
Monitor->>Monitor: break — save incomplete results
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@secator/celery_signals.py`:
- Around line 36-41: The worker_shutting_down_handler function silently swallows
all exceptions when writing to SHUTDOWN_FLAG, which can mask real issues like
missing or unwritable state directories and cause tasks to stall. Modify the
function to first ensure the parent directory of SHUTDOWN_FLAG exists by
creating it if necessary, then replace the broad exception handler with one that
specifically catches and logs at least OSError exceptions with an appropriate
error message, while still allowing the function to complete gracefully without
raising.
In `@tests/unit/test_eviction.py`:
- Around line 19-23: The setUp and tearDown methods are operating on the real
global shutdown flag file, which causes test interference when running tests in
parallel or with shared STATE_DIR. Modify the setUp method to patch
secator.celery_signals.SHUTDOWN_FLAG to point to a temporary test-specific path
before calling clear_shutdown_flag(), and modify the tearDown method to clean up
that patch after calling clear_shutdown_flag() to ensure each test uses an
isolated shutdown flag path.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e0e1675a-0d72-4b1b-a024-c404d0ceaabe
📒 Files selected for processing (3)
secator/celery_signals.pysecator/runners/command.pytests/unit/test_eviction.py
…late test The integration suite caught a real leak: SHUTDOWN_FLAG is a machine-global file, so once any worker fired worker_shutting_down (e.g. between test files) the flag persisted and every later task whose monitor polled once self-aborted — slow tasks failed, fast ones that finished before the first poll passed. Prod never hit it (1 task = 1 pod = fresh /tmp), but any shared/long-lived worker does. - command.py: clear the flag at monitor start, so it only means "shutdown raised *during this run*", not a stale flag from a previous worker/task. - celery_signals.py (CodeRabbit): ensure the flag's parent dir exists and catch + log OSError instead of swallowing all exceptions. - test_eviction.py (CodeRabbit): isolate SHUTDOWN_FLAG to a per-test temp path; add a regression test that a flag set *before* a task starts does not stop it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01P5vSjfkBuGAAHdKxHS3ySm
…ace) The behavioral test set the flag once after a fixed 2s wait, but the monitor now clears any pre-existing flag at startup; on a slow CI runner the monitor started *after* that single set and wiped it, so the task never stopped and the test failed. Re-raise the flag in a loop until the task stops, so the monitor's poll sees it once it is running regardless of start timing. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01P5vSjfkBuGAAHdKxHS3ySm
The real-subprocess behavioral test was environment-dependent: in CI the monitor thread didn't stop the live `sleep` reliably, so the test flaked (and the stale test passed for the wrong reason). Replace both with deterministic tests that exercise _monitor_process directly against a bare Command — one asserts it calls stop_process(exit_ok=True) + emits the eviction Warning when the flag is set, the other asserts it clears a stale flag at start (the integration-leak regression). No subprocess, no timing. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01P5vSjfkBuGAAHdKxHS3ySm
The real-file version of the stale-flag test flaked on CI (patched temp path + bare-__new__ Command), while the real behaviour is already proven green by the integration suite. Assert that _monitor_process calls clear_shutdown_flag() at start via a mock instead of inspecting the filesystem — deterministic and environment-independent. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01P5vSjfkBuGAAHdKxHS3ySm
Problem
On a K8s pod eviction (
SIGTERM→ grace →SIGKILL) the worker running a task dies. Withtask_acks_lateon the Redis broker, its message is only redelivered after the broker visibility timeout — which we derive above each pool's deadline (hours, on the long-task pool). So the surrounding chord/workflow stalls for that entire window.This is the one OOM/eviction case the other layers don't cover:
task_reject_on_worker_lostrequeues immediately (~18s), capped by #1199A whole-pod eviction has no surviving master, so
reject_on_worker_lostcan't help and Redis falls back to the visibility timeout.Fix
Catch the
worker_shutting_downsignal and raise a flag the running task's monitor thread polls. On shutdown it stops the task early via the existingstop_process(exit_ok=True)path — the same self-stop already used for the timeout and memory-limit cases — so the task returns its partial results through the normal completion path and the chord proceeds in seconds instead of waiting for the visibility timeout.celery_signals.py—SHUTDOWN_FLAG+worker_shutting_down_handler(wired unconditionally); stale flag cleared on worker boot. File-based so it crosses the prefork master→child boundary.command.py— the monitor checks the flag, and its poll interval is capped (MONITOR_POLL_SECONDS=5) so an eviction is caught well within the pod'sterminationGracePeriodSeconds(60s).Goes through Celery's normal completion path, so the chord is satisfied the ordinary way — no pool change (stays prefork), no Celery upgrade, no
revoke(which yieldsREVOKED/TaskRevokedError, the wrong tool here — that's for cancel, e.g.stop_runner).Tests
test_shutdown_flag_lifecycle— flag set/clear.test_monitor_stops_running_command_on_shutdown— a realsleep 30command stops early (well under 30s) and emits the evictionWarning.Validated end-to-end locally: evicting a worker mid-chord, the chord callback fired in ~5s (vs. the ~hours visibility timeout) with partial results from the evicted member.
Part of the OOM/eviction robustness set alongside #1199 (worker-loss retry cap) and #1202 (on_build runner identity).
🤖 Generated with Claude Code
https://claude.ai/code/session_01P5vSjfkBuGAAHdKxHS3ySm
Summary by CodeRabbit
New Features
Tests