44import logging
55import re
66import threading
7+ import time
78from asyncio import Task
89from contextlib import asynccontextmanager , contextmanager
9- from datetime import datetime , timezone
10+ from datetime import datetime , timedelta , timezone
1011from threading import Thread
1112from typing import TYPE_CHECKING , Any , cast
1213
2324 import httpx
2425 from typing_extensions import Self
2526
27+ from apify_client .clients import RunClient , RunClientAsync
28+
2629
2730class LogClient (ResourceClient ):
2831 """Sub-client for manipulating logs."""
@@ -228,9 +231,9 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
228231 logs for long-running actors in stand-by.
229232
230233 """
231- self ._to_logger = to_logger
232234 if self ._force_propagate :
233235 to_logger .propagate = True
236+ self ._to_logger = to_logger
234237 self ._stream_buffer = list [bytes ]()
235238 self ._split_marker = re .compile (rb'(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' )
236239 self ._relevancy_time_limit : datetime | None = None if from_start else datetime .now (tz = timezone .utc )
@@ -350,13 +353,16 @@ def start(self) -> Task:
350353 self ._streaming_task = asyncio .create_task (self ._stream_log ())
351354 return self ._streaming_task
352355
353- def stop (self ) -> None :
356+ async def stop (self ) -> None :
354357 """Stop the streaming task."""
355358 if not self ._streaming_task :
356359 raise RuntimeError ('Streaming task is not active' )
357360
358361 self ._streaming_task .cancel ()
359- self ._streaming_task = None
362+ try :
363+ await self ._streaming_task
364+ except asyncio .CancelledError :
365+ self ._streaming_task = None
360366
361367 async def __aenter__ (self ) -> Self :
362368 """Start the streaming task within the context. Exiting the context will cancel the streaming task."""
@@ -367,7 +373,7 @@ async def __aexit__(
367373 self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
368374 ) -> None :
369375 """Cancel the streaming task."""
370- self .stop ()
376+ await self .stop ()
371377
372378 async def _stream_log (self ) -> None :
373379 async with self ._log_client .stream (raw = True ) as log_stream :
@@ -378,3 +384,163 @@ async def _stream_log(self) -> None:
378384
379385 # If the stream is finished, then the last part will be also processed.
380386 self ._log_buffer_content (include_last_part = True )
387+
388+
389+ class StatusMessageWatcher :
390+ """Utility class for logging status messages from another Actor run.
391+
392+ Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
393+ especially in cases of frequent status message changes.
394+ """
395+
396+ _force_propagate = False
397+ # This is final sleep time to try to get the last status and status message of finished Actor run.
398+ # The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the
399+ # final message will be captured, but increases the chances of that.
400+ _final_sleep_time_s = 6
401+
402+ def __init__ (self , * , to_logger : logging .Logger , check_period : timedelta = timedelta (seconds = 5 )) -> None :
403+ """Initialize `StatusMessageWatcher`.
404+
405+ Args:
406+ to_logger: The logger to which the status message will be redirected.
407+ check_period: The period with which the status message will be polled.
408+ """
409+ if self ._force_propagate :
410+ to_logger .propagate = True
411+ self ._to_logger = to_logger
412+ self ._check_period = check_period .total_seconds ()
413+ self ._last_status_message = ''
414+
415+ def _log_run_data (self , run_data : dict [str , Any ] | None ) -> bool :
416+ """Get relevant run data, log them if changed and return `True` if more data is expected.
417+
418+ Args:
419+ run_data: The dictionary that contains the run data.
420+
421+ Returns:
422+ `True` if more data is expected, `False` otherwise.
423+ """
424+ if run_data is not None :
425+ status = run_data .get ('status' , 'Unknown status' )
426+ status_message = run_data .get ('statusMessage' , '' )
427+ new_status_message = f'Status: { status } , Message: { status_message } '
428+
429+ if new_status_message != self ._last_status_message :
430+ self ._last_status_message = new_status_message
431+ self ._to_logger .info (new_status_message )
432+
433+ return not (run_data .get ('isStatusMessageTerminal' , False ))
434+ return True
435+
436+
437+ class StatusMessageWatcherAsync (StatusMessageWatcher ):
438+ """Async variant of `StatusMessageWatcher` that is logging in task."""
439+
440+ def __init__ (
441+ self , * , run_client : RunClientAsync , to_logger : logging .Logger , check_period : timedelta = timedelta (seconds = 1 )
442+ ) -> None :
443+ """Initialize `StatusMessageWatcherAsync`.
444+
445+ Args:
446+ run_client: The client for run that will be used to get a status and message.
447+ to_logger: The logger to which the status message will be redirected.
448+ check_period: The period with which the status message will be polled.
449+ """
450+ super ().__init__ (to_logger = to_logger , check_period = check_period )
451+ self ._run_client = run_client
452+ self ._logging_task : Task | None = None
453+
454+ def start (self ) -> Task :
455+ """Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method."""
456+ if self ._logging_task :
457+ raise RuntimeError ('Logging task already active' )
458+ self ._logging_task = asyncio .create_task (self ._log_changed_status_message ())
459+ return self ._logging_task
460+
461+ async def stop (self ) -> None :
462+ """Stop the logging task."""
463+ if not self ._logging_task :
464+ raise RuntimeError ('Logging task is not active' )
465+
466+ self ._logging_task .cancel ()
467+ try :
468+ await self ._logging_task
469+ except asyncio .CancelledError :
470+ self ._logging_task = None
471+
472+ async def __aenter__ (self ) -> Self :
473+ """Start the logging task within the context. Exiting the context will cancel the logging task."""
474+ self .start ()
475+ return self
476+
477+ async def __aexit__ (
478+ self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
479+ ) -> None :
480+ """Cancel the logging task."""
481+ await asyncio .sleep (self ._final_sleep_time_s )
482+ await self .stop ()
483+
484+ async def _log_changed_status_message (self ) -> None :
485+ while True :
486+ run_data = await self ._run_client .get ()
487+ if not self ._log_run_data (run_data ):
488+ break
489+ await asyncio .sleep (self ._check_period )
490+
491+
492+ class StatusMessageWatcherSync (StatusMessageWatcher ):
493+ """Sync variant of `StatusMessageWatcher` that is logging in thread."""
494+
495+ def __init__ (
496+ self , * , run_client : RunClient , to_logger : logging .Logger , check_period : timedelta = timedelta (seconds = 1 )
497+ ) -> None :
498+ """Initialize `StatusMessageWatcherSync`.
499+
500+ Args:
501+ run_client: The client for run that will be used to get a status and message.
502+ to_logger: The logger to which the status message will be redirected.
503+ check_period: The period with which the status message will be polled.
504+ """
505+ super ().__init__ (to_logger = to_logger , check_period = check_period )
506+ self ._run_client = run_client
507+ self ._logging_thread : Thread | None = None
508+ self ._stop_logging = False
509+
510+ def start (self ) -> Thread :
511+ """Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method."""
512+ if self ._logging_thread :
513+ raise RuntimeError ('Logging thread already active' )
514+ self ._stop_logging = False
515+ self ._logging_thread = threading .Thread (target = self ._log_changed_status_message )
516+ self ._logging_thread .start ()
517+ return self ._logging_thread
518+
519+ def stop (self ) -> None :
520+ """Signal the _logging_thread thread to stop logging and wait for it to finish."""
521+ if not self ._logging_thread :
522+ raise RuntimeError ('Logging thread is not active' )
523+ time .sleep (self ._final_sleep_time_s )
524+ self ._stop_logging = True
525+ self ._logging_thread .join ()
526+ self ._logging_thread = None
527+ self ._stop_logging = False
528+
529+ def __enter__ (self ) -> Self :
530+ """Start the logging task within the context. Exiting the context will cancel the logging task."""
531+ self .start ()
532+ return self
533+
534+ def __exit__ (
535+ self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
536+ ) -> None :
537+ """Cancel the logging task."""
538+ self .stop ()
539+
540+ def _log_changed_status_message (self ) -> None :
541+ while True :
542+ if not self ._log_run_data (self ._run_client .get ()):
543+ break
544+ if self ._stop_logging :
545+ break
546+ time .sleep (self ._check_period )
0 commit comments