Skip to content

Commit

Permalink
fix: dia-1301 : adding .1 sec delay for kafka message injection. (#170)
Browse files Browse the repository at this point in the history
Co-authored-by: Forum Gala <[email protected]>
Co-authored-by: pakelley <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2024
1 parent d296baf commit 34ce544
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
from pydantic import BaseModel, SerializeAsAny, field_validator
import uvicorn
from redis import Redis
import time

from server.log_middleware import LogMiddleware
from server.tasks.process_file import app as celery_app
from server.tasks.process_file import streaming_parent_task
from server.utils import get_input_topic_name, get_output_topic_name, Settings, delete_topic
from server.utils import (
get_input_topic_name,
get_output_topic_name,
Settings,
delete_topic,
)
from server.handlers.result_handlers import ResultHandler


Expand Down Expand Up @@ -177,6 +183,9 @@ async def submit_batch(batch: BatchData):
try:
for record in batch.data:
await producer.send_and_wait(topic, value=record)
# FIXME Temporary workaround for messages getting dropped.
# Remove once our kafka messaging is more reliable.
time.sleep(0.1)
except UnknownTopicOrPartitionError:
await producer.stop()
raise HTTPException(
Expand Down

0 comments on commit 34ce544

Please sign in to comment.