Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions secator/celery_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@
STATE_DIR = Path("/tmp/celery_state")
STATE_DIR.mkdir(exist_ok=True, parents=True)

# Eviction flag. On worker shutdown (e.g. a K8s pod SIGTERM eviction) we raise this flag; the
# running task's monitor thread (in the prefork child, a separate process) polls it via a file
# and stops early, returning partial results so the surrounding chord proceeds — instead of the
# task hanging until the broker visibility timeout redelivers it (hours, on the long pool).
SHUTDOWN_FLAG = STATE_DIR / "worker_shutdown"


def is_worker_shutting_down():
"""True once the worker has begun shutting down (set by worker_shutting_down_handler)."""
return SHUTDOWN_FLAG.exists()


def clear_shutdown_flag():
"""Remove the eviction flag (called on worker boot to drop any stale flag)."""
if SHUTDOWN_FLAG.exists():
SHUTDOWN_FLAG.unlink()


def worker_shutting_down_handler(**kwargs):
"""Raise the eviction flag so the in-flight task's monitor stops it early and returns."""
try:
SHUTDOWN_FLAG.write_text("1")
except Exception:
pass
Comment thread
ocervell marked this conversation as resolved.
Outdated


def get_lock_file_path():
worker_name = os.environ.get("WORKER_NAME", f"unknown_{os.getpid()}")
Expand Down Expand Up @@ -126,6 +151,11 @@ def setup_handlers():
if CONFIG.celery.override_default_logging:
signals.setup_logging.connect(setup_logging)

# Eviction handling (always on): clear any stale flag from a previous worker in this pod,
# and raise it on shutdown so the in-flight task stops early and lets its chord proceed.
clear_shutdown_flag()
signals.worker_shutting_down.connect(worker_shutting_down_handler)

# Register common handlers when either task‐ or idle‐based termination is enabled
if CONFIG.celery.worker_kill_after_task or CONFIG.celery.worker_kill_after_idle_seconds != -1:
signals.celeryd_after_setup.connect(capture_worker_name)
Expand Down
22 changes: 20 additions & 2 deletions secator/runners/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@

logger = logging.getLogger(__name__)

# Upper bound on the monitor thread's poll interval (seconds). Keeps the shutdown/timeout checks
# responsive even when stat_update_frequency is large, so an evicted task stops well within the
# pod's termination grace period.
MONITOR_POLL_SECONDS = 5


class Command(Runner):
"""Base class to execute an external command."""
Expand Down Expand Up @@ -712,6 +717,7 @@ def get_max_timeout(self):

def _monitor_process(self):
"""Monitor thread that checks process health and kills if necessary."""
from secator.celery_signals import is_worker_shutting_down
last_stats_time = 0

while not self.monitor_stop_event.is_set():
Expand All @@ -722,6 +728,16 @@ def _monitor_process(self):
current_time = time()
self.debug('Collecting monitor items', sub='monitor')

# Worker is shutting down (e.g. K8s pod eviction): stop early and save partial
# results so the surrounding chord proceeds, rather than hang until the broker
# visibility timeout redelivers this task.
if is_worker_shutting_down():
warning = Warning(message='Worker shutting down (eviction): stopping task early, saving incomplete results')
if self.monitor_queue is not None:
self.monitor_queue.put(warning)
self.stop_process(exit_ok=True, sig=signal.SIGTERM)
break

# Collect and queue stats at regular intervals
if (current_time - last_stats_time) >= CONFIG.runners.stat_update_frequency:
stats_items = list(self._collect_stats())
Expand Down Expand Up @@ -770,8 +786,10 @@ def _monitor_process(self):
self.monitor_queue.put(warning)
break

# Sleep for a short interval before next check (stat update frequency)
self.monitor_stop_event.wait(CONFIG.runners.stat_update_frequency)
# Wake at least every MONITOR_POLL_SECONDS so the shutdown/timeout checks stay
# responsive (stats themselves are still gated to stat_update_frequency above), so
# an eviction is caught well within the pod's termination grace period.
self.monitor_stop_event.wait(min(CONFIG.runners.stat_update_frequency, MONITOR_POLL_SECONDS))

def _collect_stats(self):
"""Collect stats about the current running process, if any."""
Expand Down
63 changes: 63 additions & 0 deletions tests/unit/test_eviction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import threading
import time
import unittest
import unittest.mock

from secator.celery_signals import (
clear_shutdown_flag,
is_worker_shutting_down,
worker_shutting_down_handler,
)
from secator.runners import Command


class TestEvictionSelfFinalize(unittest.TestCase):
"""Worker-eviction self-finalize: on shutdown (e.g. a K8s pod SIGTERM eviction) the in-flight
task's monitor stops it early and returns partial results, so the surrounding chord proceeds
instead of hanging until the broker visibility timeout redelivers the task."""

def setUp(self):
clear_shutdown_flag()

def tearDown(self):
clear_shutdown_flag()
Comment thread
ocervell marked this conversation as resolved.

def test_shutdown_flag_lifecycle(self):
"""worker_shutting_down_handler raises the flag; clear_shutdown_flag drops it."""
self.assertFalse(is_worker_shutting_down())
worker_shutting_down_handler()
self.assertTrue(is_worker_shutting_down())
clear_shutdown_flag()
self.assertFalse(is_worker_shutting_down())

def test_monitor_stops_running_command_on_shutdown(self):
"""A long-running command stops early once the flag is raised (instead of running to
completion), and emits the eviction Warning — proving the monitor self-stop path."""
holder = {}

def run():
holder['cmd'] = Command.execute('sleep 30', name='evict_sleep', process=True, quiet=True)

t = threading.Thread(target=run, daemon=True)
# Poll fast so the test doesn't wait the full stat-update cadence.
with unittest.mock.patch('secator.runners.command.MONITOR_POLL_SECONDS', 1):
start = time.monotonic()
t.start()
time.sleep(2) # let the subprocess + monitor thread start
worker_shutting_down_handler() # simulate the eviction SIGTERM
t.join(timeout=20)
elapsed = time.monotonic() - start

self.assertFalse(t.is_alive(), 'command did not stop after the shutdown flag was raised')
self.assertLess(elapsed, 25, 'command did not stop early (ran toward the full 30s sleep)')

cmd = holder['cmd']
signals = [
item for item in (cmd.warnings + cmd.results)
if 'shutting down' in str(getattr(item, 'message', '')).lower()
]
self.assertTrue(signals, 'no eviction warning emitted on shutdown')


if __name__ == '__main__':
unittest.main()
Loading