From 2f8d2a2f07291296f4d0a3516de8d4bbcfc694e3 Mon Sep 17 00:00:00 2001 From: tmichela Date: Fri, 30 Aug 2024 14:17:52 +0200 Subject: [PATCH 1/4] fallback on local execution if slurm fails --- damnit/backend/listener.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 798d3ada..3a9b69a0 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -5,6 +5,7 @@ import platform from pathlib import Path from socket import gethostname +from threading import Thread from kafka import KafkaConsumer @@ -110,7 +111,11 @@ def handle_event(self, record, msg: dict, run_data: RunData): log.info(f"Added p%d r%d ({run_data.value} data) to database", proposal, run) req = ExtractionRequest(run, proposal, run_data) - self.submitter.submit(req) + try: + self.submitter.submit(req) + except Exception: + log.warning("Slurm job submission failed, starting process locally") + Thread(target=self.submitter.execute_direct, args=(req, )).start() def listen(): From 2f970ec4e0a3369f03c21293feaa14217608a07d Mon Sep 17 00:00:00 2001 From: tmichela Date: Mon, 2 Sep 2024 08:48:28 +0200 Subject: [PATCH 2/4] log error information --- damnit/backend/listener.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 3a9b69a0..05127810 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -113,8 +113,11 @@ def handle_event(self, record, msg: dict, run_data: RunData): req = ExtractionRequest(run, proposal, run_data) try: self.submitter.submit(req) + except FileNotFoundError: + log.warning('Slurm not available, starting process locally.') + Thread(target=self.submitter.execute_direct, args=(req, )).start() except Exception: - log.warning("Slurm job submission failed, starting process locally") + log.error("Slurm job submission failed, starting process locally.", exc_info=True) Thread(target=self.submitter.execute_direct, args=(req, )).start() From 7382ab214378580b80faa89dc6d1f2e8f67f5eb7 Mon Sep 17 00:00:00 2001 From: tmichela Date: Wed, 4 Sep 2024 15:07:16 +0200 Subject: [PATCH 3/4] limit the number of concurrent process running locally --- damnit/backend/listener.py | 26 +++++++++++++++++++++++--- tests/conftest.py | 2 +- tests/test_backend.py | 36 ++++++++++++++++++++++++++++++++++-- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/damnit/backend/listener.py b/damnit/backend/listener.py index 05127810..aa3ceedb 100644 --- a/damnit/backend/listener.py +++ b/damnit/backend/listener.py @@ -31,6 +31,26 @@ log = logging.getLogger(__name__) +# tracking number of local threads running in parallel +# only relevant if slurm isn't available +MAX_CONCURRENT_THREADS = min(os.cpu_count() // 2, 10) +local_extraction_threads = [] + + +def execute_direct(submitter, request): + for th in local_extraction_threads.copy(): + if not th.is_alive(): + local_extraction_threads.pop(local_extraction_threads.index(th)) + + if len(local_extraction_threads) >= MAX_CONCURRENT_THREADS: + log.warning(f'Too many events processing ({MAX_CONCURRENT_THREADS}), ' + f'skip event (p{request.proposal}, r{request.run}, {request.run_data.value})') + return + + extr = Thread(target=submitter.execute_direct, args=(request, )) + local_extraction_threads.append(extr) + extr.start() + class EventProcessor: @@ -57,7 +77,6 @@ def __init__(self, context_dir=Path('.')): ) self.events = kafka_conf['events'] - def __enter__(self): return self @@ -115,10 +134,10 @@ def handle_event(self, record, msg: dict, run_data: RunData): self.submitter.submit(req) except FileNotFoundError: log.warning('Slurm not available, starting process locally.') - Thread(target=self.submitter.execute_direct, args=(req, )).start() + execute_direct(self.submitter, req) except Exception: log.error("Slurm job submission failed, starting process locally.", exc_info=True) - Thread(target=self.submitter.execute_direct, args=(req, )).start() + execute_direct(self.submitter, req) def listen(): @@ -145,5 +164,6 @@ def listen(): if os.stat("amore.log").st_uid == os.getuid(): os.chmod("amore.log", 0o666) + if __name__ == '__main__': listen() diff --git a/tests/conftest.py b/tests/conftest.py index 9802bcff..55782150 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ import socket -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest import numpy as np diff --git a/tests/test_backend.py b/tests/test_backend.py index abb05b1c..6881cc1a 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -8,6 +8,7 @@ import stat import subprocess import textwrap +from time import sleep, time from unittest.mock import MagicMock, patch import extra_data as ed @@ -24,9 +25,11 @@ from damnit.backend import backend_is_running, initialize_and_start_backend from damnit.backend.db import DamnitDB from damnit.backend.extract_data import Extractor, add_to_db +from damnit.backend.listener import (MAX_CONCURRENT_THREADS, EventProcessor, + local_extraction_threads) from damnit.backend.supervisord import wait_until, write_supervisord_conf -from damnit.context import (ContextFile, ContextFileErrors, PNGData, Results, - RunData, get_proposal_path) +from damnit.context import (ContextFile, ContextFileErrors, PNGData, RunData, + get_proposal_path) from damnit.ctxsupport.ctxrunner import THUMBNAIL_SIZE from damnit.gui.main_window import MainWindow @@ -857,3 +860,32 @@ def subprocess_runner(): assert initialize_and_start_backend(db_dir) assert backend_is_running(db_dir) + + +def test_event_processor(mock_db, caplog): + db_dir, db = mock_db + db.metameta["proposal"] = 1234 + + processor = EventProcessor(db_dir) + assert len(local_extraction_threads) == 0 + + # slurm not available + with ( + patch('subprocess.run', side_effect=FileNotFoundError), + patch('damnit.backend.extraction_control.ExtractionSubmitter.execute_direct', lambda *_: sleep(1)) + ): + with caplog.at_level(logging.WARNING): + event = MagicMock(timestamp=time()) + processor.handle_event(event, {'proposal': 1234, 'run': 1}, RunData.RAW) + + assert 'Slurm not available' in caplog.text + assert len(local_extraction_threads) == 1 + local_extraction_threads[0].join() + + with caplog.at_level(logging.WARNING): + for idx in range(MAX_CONCURRENT_THREADS + 1): + event = MagicMock(timestamp=time()) + processor.handle_event(event, {'proposal': 1234, 'run': idx + 1}, RunData.RAW) + + assert len(local_extraction_threads) == MAX_CONCURRENT_THREADS + assert 'Too many events processing' in caplog.text From c08cd9b5c93642620852c848037462dbc641b3f3 Mon Sep 17 00:00:00 2001 From: tmichela Date: Wed, 4 Sep 2024 15:18:27 +0200 Subject: [PATCH 4/4] patch kafak consumer in test; --- tests/test_backend.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_backend.py b/tests/test_backend.py index 6881cc1a..58714b66 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -866,7 +866,10 @@ def test_event_processor(mock_db, caplog): db_dir, db = mock_db db.metameta["proposal"] = 1234 - processor = EventProcessor(db_dir) + with patch('damnit.backend.listener.KafkaConsumer') as kcon: + processor = EventProcessor(db_dir) + + kcon.assert_called_once() assert len(local_extraction_threads) == 0 # slurm not available