diff --git a/src/zeroband/utils/monitor.py b/src/zeroband/utils/monitor.py index d67cf496..1d3d96b2 100644 --- a/src/zeroband/utils/monitor.py +++ b/src/zeroband/utils/monitor.py @@ -1,4 +1,3 @@ -from collections import deque from typing import Any from zeroband.utils.logging import get_logger import aiohttp @@ -10,7 +9,7 @@ async def _get_external_ip(max_retries=3, retry_delay=5): async with aiohttp.ClientSession() as session: for attempt in range(max_retries): try: - async with session.get("https://api.ipify.org", timeout=10) as response: + async with session.get('https://api.ipify.org', timeout=10) as response: response.raise_for_status() return await response.text() except ClientError: @@ -42,7 +41,8 @@ def __init__(self, config, *args, **kwargs): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - self._pending_tasks = deque() + def __del__(self): + self.loop.close() def _remove_duplicates(self): seen = set() @@ -68,33 +68,9 @@ def log(self, data: dict[str, Any]): self._handle_send_batch() - def __del__(self): - # Ensure all pending tasks are completed before closing - if hasattr(self, "loop") and self.loop is not None: - try: - pending = asyncio.all_tasks(self.loop) - self.loop.run_until_complete(asyncio.gather(*pending)) - except Exception as e: - self._logger.error(f"Error cleaning up pending tasks: {str(e)}") - finally: - self.loop.close() - - def _cleanup_completed_tasks(self): - """Remove completed tasks from the pending tasks queue""" - while self._pending_tasks and self._pending_tasks[0].done(): - task = self._pending_tasks.popleft() - try: - task.result() # This will raise any exceptions that occurred - except Exception as e: - self._logger.error(f"Error in completed batch send task: {str(e)}") - def _handle_send_batch(self, flush: bool = False): - self._cleanup_completed_tasks() - if len(self.data) >= self.log_flush_interval or flush: - # Create a new task for sending the batch - task = self.loop.create_task(self._send_batch()) - self._pending_tasks.append(task) + self.loop.run_until_complete(self._send_batch()) async def _set_node_ip_address(self): if self.node_ip_address is None and self.node_ip_address_fetch_status != "failed": @@ -113,11 +89,16 @@ async def _send_batch(self): self._remove_duplicates() await self._set_node_ip_address() - batch = self.data[: self.log_flush_interval] + batch = self.data[:self.log_flush_interval] # set node_ip_address of batch batch = [{**log, "node_ip_address": self.node_ip_address} for log in batch] - headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.auth_token}"} - payload = {"logs": batch} + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.auth_token}" + } + payload = { + "logs": batch + } api = f"{self.base_url}/metrics/{self.run_id}/logs" try: