diff --git a/src/zeroband/utils/monitor.py b/src/zeroband/utils/monitor.py index 1d3d96b2..d67cf496 100644 --- a/src/zeroband/utils/monitor.py +++ b/src/zeroband/utils/monitor.py @@ -1,3 +1,4 @@ +from collections import deque from typing import Any from zeroband.utils.logging import get_logger import aiohttp @@ -9,7 +10,7 @@ async def _get_external_ip(max_retries=3, retry_delay=5): async with aiohttp.ClientSession() as session: for attempt in range(max_retries): try: - async with session.get('https://api.ipify.org', timeout=10) as response: + async with session.get("https://api.ipify.org", timeout=10) as response: response.raise_for_status() return await response.text() except ClientError: @@ -41,8 +42,7 @@ def __init__(self, config, *args, **kwargs): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - def __del__(self): - self.loop.close() + self._pending_tasks = deque() def _remove_duplicates(self): seen = set() @@ -68,9 +68,33 @@ def log(self, data: dict[str, Any]): self._handle_send_batch() + def __del__(self): + # Ensure all pending tasks are completed before closing + if hasattr(self, "loop") and self.loop is not None: + try: + pending = asyncio.all_tasks(self.loop) + self.loop.run_until_complete(asyncio.gather(*pending)) + except Exception as e: + self._logger.error(f"Error cleaning up pending tasks: {str(e)}") + finally: + self.loop.close() + + def _cleanup_completed_tasks(self): + """Remove completed tasks from the pending tasks queue""" + while self._pending_tasks and self._pending_tasks[0].done(): + task = self._pending_tasks.popleft() + try: + task.result() # This will raise any exceptions that occurred + except Exception as e: + self._logger.error(f"Error in completed batch send task: {str(e)}") + def _handle_send_batch(self, flush: bool = False): + self._cleanup_completed_tasks() + if len(self.data) >= self.log_flush_interval or flush: - self.loop.run_until_complete(self._send_batch()) + # Create a new task for sending the batch + task = self.loop.create_task(self._send_batch()) + self._pending_tasks.append(task) async def _set_node_ip_address(self): if self.node_ip_address is None and self.node_ip_address_fetch_status != "failed": @@ -89,16 +113,11 @@ async def _send_batch(self): self._remove_duplicates() await self._set_node_ip_address() - batch = self.data[:self.log_flush_interval] + batch = self.data[: self.log_flush_interval] # set node_ip_address of batch batch = [{**log, "node_ip_address": self.node_ip_address} for log in batch] - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.auth_token}" - } - payload = { - "logs": batch - } + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.auth_token}"} + payload = {"logs": batch} api = f"{self.base_url}/metrics/{self.run_id}/logs" try: