diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 798d3ada..aa3ceedb 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -5,6 +5,7 @@ import platform from pathlib import Path from socket import gethostname +from threading import Thread from kafka import KafkaConsumer @@ -30,6 +31,26 @@ log = logging.getLogger(__name__) +# tracking number of local threads running in parallel +# only relevant if slurm isn't available +MAX_CONCURRENT_THREADS = min(os.cpu_count() // 2, 10) +local_extraction_threads = [] + + +def execute_direct(submitter, request): + for th in local_extraction_threads.copy(): + if not th.is_alive(): + local_extraction_threads.pop(local_extraction_threads.index(th)) + + if len(local_extraction_threads) >= MAX_CONCURRENT_THREADS: + log.warning(f'Too many events processing ({MAX_CONCURRENT_THREADS}), ' + f'skip event (p{request.proposal}, r{request.run}, {request.run_data.value})') + return + + extr = Thread(target=submitter.execute_direct, args=(request, )) + local_extraction_threads.append(extr) + extr.start() + class EventProcessor: @@ -56,7 +77,6 @@ def __init__(self, context_dir=Path('.')): ) self.events = kafka_conf['events'] - def __enter__(self): return self @@ -110,7 +130,14 @@ def handle_event(self, record, msg: dict, run_data: RunData): log.info(f"Added p%d r%d ({run_data.value} data) to database", proposal, run) req = ExtractionRequest(run, proposal, run_data) - self.submitter.submit(req) + try: + self.submitter.submit(req) + except FileNotFoundError: + log.warning('Slurm not available, starting process locally.') + execute_direct(self.submitter, req) + except Exception: + log.error("Slurm job submission failed, starting process locally.", exc_info=True) + execute_direct(self.submitter, req) def listen(): @@ -137,5 +164,6 @@ def listen(): if os.stat("amore.log").st_uid == os.getuid(): os.chmod("amore.log", 0o666) + if __name__ == '__main__': listen() diff --git a/tests/conftest.py b/tests/conftest.py index 9802bcff..55782150 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ import socket -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest import numpy as np diff --git a/tests/test_backend.py b/tests/test_backend.py index abb05b1c..58714b66 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -8,6 +8,7 @@ import stat import subprocess import textwrap +from time import sleep, time from unittest.mock import MagicMock, patch import extra_data as ed @@ -24,9 +25,11 @@ from damnit.backend import backend_is_running, initialize_and_start_backend from damnit.backend.db import DamnitDB from damnit.backend.extract_data import Extractor, add_to_db +from damnit.backend.listener import (MAX_CONCURRENT_THREADS, EventProcessor, + local_extraction_threads) from damnit.backend.supervisord import wait_until, write_supervisord_conf -from damnit.context import (ContextFile, ContextFileErrors, PNGData, Results, - RunData, get_proposal_path) +from damnit.context import (ContextFile, ContextFileErrors, PNGData, RunData, + get_proposal_path) from damnit.ctxsupport.ctxrunner import THUMBNAIL_SIZE from damnit.gui.main_window import MainWindow @@ -857,3 +860,35 @@ def subprocess_runner(): assert initialize_and_start_backend(db_dir) assert backend_is_running(db_dir) + + +def test_event_processor(mock_db, caplog): + db_dir, db = mock_db + db.metameta["proposal"] = 1234 + + with patch('damnit.backend.listener.KafkaConsumer') as kcon: + processor = EventProcessor(db_dir) + + kcon.assert_called_once() + assert len(local_extraction_threads) == 0 + + # slurm not available + with ( + patch('subprocess.run', side_effect=FileNotFoundError), + patch('damnit.backend.extraction_control.ExtractionSubmitter.execute_direct', lambda *_: sleep(1)) + ): + with caplog.at_level(logging.WARNING): + event = MagicMock(timestamp=time()) + processor.handle_event(event, {'proposal': 1234, 'run': 1}, RunData.RAW) + + assert 'Slurm not available' in caplog.text + assert len(local_extraction_threads) == 1 + local_extraction_threads[0].join() + + with caplog.at_level(logging.WARNING): + for idx in range(MAX_CONCURRENT_THREADS + 1): + event = MagicMock(timestamp=time()) + processor.handle_event(event, {'proposal': 1234, 'run': idx + 1}, RunData.RAW) + + assert len(local_extraction_threads) == MAX_CONCURRENT_THREADS + assert 'Too many events processing' in caplog.text