From af73a03a9cb4b5ab50278b5cfdac8519d0c522f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Zaoral?= Date: Mon, 25 Mar 2024 10:35:08 +0100 Subject: [PATCH 1/2] worker: remove Python 2 code from logger.py --- kobo/worker/logger.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/kobo/worker/logger.py b/kobo/worker/logger.py index a5c3961..75b7a5b 100644 --- a/kobo/worker/logger.py +++ b/kobo/worker/logger.py @@ -1,13 +1,8 @@ -# -*- coding: utf-8 -*- - import threading import time import os - -import six - -from six.moves import queue -from six import BytesIO +import queue +from io import BytesIO import kobo.tback @@ -40,7 +35,7 @@ def read_queue(self): # We do not know whether we're being sent bytes or text. # The hub API always wants bytes. # Ensure we safely convert everything to bytes as we go. - if isinstance(out, six.text_type): + if isinstance(out, str): out = out.encode('utf-8', errors='replace') return out @@ -103,7 +98,7 @@ def stop(self): self.join() -class LoggingIO(object): +class LoggingIO(): """StringIO wrapper that also writes all data to a logging thread.""" def __init__(self, io, logging_thread): From bc3df3ea22a8c2f1393236633db4408014311ab2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Zaoral?= Date: Wed, 3 Apr 2024 14:32:29 +0200 Subject: [PATCH 2/2] worker: fix deadlock when LoggingThread wrote into its own Queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If self._hub.upload_task_log() called self._queue.put(), it would cause deadlock because 1. self._queue uses locks that are not reentrant. 2. it will block if the Queue is already full. Co-authored-by: Kamil Dudka Co-authored-by: Lukáš Zaoral --- kobo/worker/logger.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/kobo/worker/logger.py b/kobo/worker/logger.py index 75b7a5b..d2b09c4 100644 --- a/kobo/worker/logger.py +++ b/kobo/worker/logger.py @@ -24,6 +24,7 @@ def __init__(self, hub, task_id, *args, **kwargs): self._buffer_size = kwargs.pop('buffer_size', 256) self._queue = queue.Queue(maxsize=self._buffer_size) self._event = threading.Event() + self._in_logger_call = False self._running = True self._send_time = 0 self._send_data = b"" @@ -88,8 +89,24 @@ def run(self): def write(self, data): """Add data to the queue and set the event for sending queue content.""" - self._queue.put(data) - self._event.set() + if threading.get_ident() != self.ident: + self._queue.put(data) + self._event.set() + + # If self._hub.upload_task_log() called self._queue.put(), it would + # cause deadlock because self._queue uses locks that are not reentrant + # and queue may already be full. + # + # Log only data with printable characters. + elif self._logger and data.strip(): + # Prevent infinite recursion if this thread is also used for the + # logger output. + if self._in_logger_call: + return + + self._in_logger_call = True + self._logger.log_error("Error in LoggingThread: %r", data) + self._in_logger_call = False def stop(self): """Send remaining data to hub and finish."""