diff --git a/server/handlers/result_handlers.py b/server/handlers/result_handlers.py index db5d09b4..4b56cc6e 100644 --- a/server/handlers/result_handlers.py +++ b/server/handlers/result_handlers.py @@ -195,6 +195,7 @@ def prepare_errors_payload(self, error_batch): def __call__(self, result_batch: list[Dict]): logger.debug(f"\n\nHandler received batch: {result_batch}\n\n") + logger.info("LSEHandler received batch") # coerce dicts to LSEBatchItems for validation norm_result_batch = [ @@ -207,6 +208,7 @@ def __call__(self, result_batch: list[Dict]): # coerce back to dicts for sending result_batch = [record.dict() for record in result_batch] if result_batch: + logger.info(f"LSEHandler sending {len(result_batch)} predictions to LSE") self.client.make_request( "POST", "/api/model-run/batch-predictions", @@ -217,6 +219,7 @@ def __call__(self, result_batch: list[Dict]): } ), ) + logger.info(f"LSEHandler sent {len(result_batch)} predictions to LSE") else: logger.error( f"No valid results to send to LSE for modelrun_id {self.modelrun_id}" @@ -225,6 +228,7 @@ def __call__(self, result_batch: list[Dict]): # Send failed predictions back to LSE if error_batch: error_batch = self.prepare_errors_payload(error_batch) + logger.info(f"LSEHandler sending {len(error_batch)} failed predictions to LSE") self.client.make_request( "POST", "/api/model-run/batch-failed-predictions", @@ -235,6 +239,7 @@ def __call__(self, result_batch: list[Dict]): } ), ) + logger.info(f"LSEHandler sent {len(error_batch)} failed predictions to LSE") else: logger.debug(f"No errors to send to LSE for modelrun_id {self.modelrun_id}") diff --git a/server/tasks/stream_inference.py b/server/tasks/stream_inference.py index 9d7af896..c8bb6a62 100644 --- a/server/tasks/stream_inference.py +++ b/server/tasks/stream_inference.py @@ -71,7 +71,7 @@ async def run_streaming( task_time_limit=settings.task_time_limit_sec, ) def streaming_parent_task( - self, agent: Agent, result_handler: ResultHandler, batch_size: int = 50 + self, agent: Agent, result_handler: ResultHandler, batch_size: int = 10 ): """ This task is used to launch the two tasks that are doing the real work, so that @@ -162,16 +162,17 @@ async def async_process_streaming_output( for topic_partition, messages in data.items(): topic = topic_partition.topic if messages: - logger.debug(f"Handling {messages=} in {topic=}") + logger.info(f"Processing messages in output job {topic=} number of messages: {len(messages)}") data = [msg.value for msg in messages] result_handler(data) - logger.debug(f"Handled {len(messages)} messages in {topic=}") + logger.info(f"Processed messages in output job {topic=} number of messages: {len(messages)}") else: - logger.debug(f"No messages in topic {topic=}") + logger.info(f"Consumer pulled data, but no messages in {topic=}") if not data: - logger.info("No messages in any topic") + logger.info(f"Consumer pulled no data from {output_topic_name=}") # cleans up after any exceptions raised here as well as asyncio.CancelledError resulting from failure in async_process_streaming_input finally: + logger.info("No more data in output job and input job is done, stopping output job") await consumer.stop()