Skip to content

Commit

Permalink
fix: DIA-1584: Evaluation not returning results intermittently (#242)
Browse files Browse the repository at this point in the history
Co-authored-by: hakan458 <[email protected]>
  • Loading branch information
hakan458 and hakan458 authored Nov 1, 2024
1 parent 2bf8169 commit b33a87f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
5 changes: 5 additions & 0 deletions server/handlers/result_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def prepare_errors_payload(self, error_batch):

def __call__(self, result_batch: list[Dict]):
logger.debug(f"\n\nHandler received batch: {result_batch}\n\n")
logger.info("LSEHandler received batch")

# coerce dicts to LSEBatchItems for validation
norm_result_batch = [
Expand All @@ -207,6 +208,7 @@ def __call__(self, result_batch: list[Dict]):
# coerce back to dicts for sending
result_batch = [record.dict() for record in result_batch]
if result_batch:
logger.info(f"LSEHandler sending {len(result_batch)} predictions to LSE")
self.client.make_request(
"POST",
"/api/model-run/batch-predictions",
Expand All @@ -217,6 +219,7 @@ def __call__(self, result_batch: list[Dict]):
}
),
)
logger.info(f"LSEHandler sent {len(result_batch)} predictions to LSE")
else:
logger.error(
f"No valid results to send to LSE for modelrun_id {self.modelrun_id}"
Expand All @@ -225,6 +228,7 @@ def __call__(self, result_batch: list[Dict]):
# Send failed predictions back to LSE
if error_batch:
error_batch = self.prepare_errors_payload(error_batch)
logger.info(f"LSEHandler sending {len(error_batch)} failed predictions to LSE")
self.client.make_request(
"POST",
"/api/model-run/batch-failed-predictions",
Expand All @@ -235,6 +239,7 @@ def __call__(self, result_batch: list[Dict]):
}
),
)
logger.info(f"LSEHandler sent {len(error_batch)} failed predictions to LSE")
else:
logger.debug(f"No errors to send to LSE for modelrun_id {self.modelrun_id}")

Expand Down
11 changes: 6 additions & 5 deletions server/tasks/stream_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def run_streaming(
task_time_limit=settings.task_time_limit_sec,
)
def streaming_parent_task(
self, agent: Agent, result_handler: ResultHandler, batch_size: int = 50
self, agent: Agent, result_handler: ResultHandler, batch_size: int = 10
):
"""
This task is used to launch the two tasks that are doing the real work, so that
Expand Down Expand Up @@ -162,16 +162,17 @@ async def async_process_streaming_output(
for topic_partition, messages in data.items():
topic = topic_partition.topic
if messages:
logger.debug(f"Handling {messages=} in {topic=}")
logger.info(f"Processing messages in output job {topic=} number of messages: {len(messages)}")
data = [msg.value for msg in messages]
result_handler(data)
logger.debug(f"Handled {len(messages)} messages in {topic=}")
logger.info(f"Processed messages in output job {topic=} number of messages: {len(messages)}")
else:
logger.debug(f"No messages in topic {topic=}")
logger.info(f"Consumer pulled data, but no messages in {topic=}")

if not data:
logger.info("No messages in any topic")
logger.info(f"Consumer pulled no data from {output_topic_name=}")

# cleans up after any exceptions raised here as well as asyncio.CancelledError resulting from failure in async_process_streaming_input
finally:
logger.info("No more data in output job and input job is done, stopping output job")
await consumer.stop()

0 comments on commit b33a87f

Please sign in to comment.