Skip to content

Commit

Permalink
kafka service update
Browse files Browse the repository at this point in the history
  • Loading branch information
AbYT101 committed Jun 22, 2024
1 parent ef31a5c commit a25ffcf
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
29 changes: 18 additions & 11 deletions app/services/kafka_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from confluent_kafka import Producer, Consumer, KafkaException
import logging
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
import json

class KafkaService:
def __init__(self, brokers):
self.brokers = brokers
self.producer = Producer({'bootstrap.servers': brokers})
self.consumer = Consumer({
'bootstrap.servers': brokers,
Expand All @@ -16,15 +18,20 @@ def produce(self, topic, message):

def consume(self, topic, callback):
self.consumer.subscribe([topic])
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
else:
raise KafkaException(msg.error())
callback(json.loads(msg.value()))
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
callback(json.loads(msg.value()))
except Exception as e:
logging.error(f"Error in Kafka consumer: {str(e)}")
finally:
self.consumer.close()

kafka_service = KafkaService(brokers='localhost:9092')
kafka_service = KafkaService(brokers='localhost:9092')
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ Flask-Bcrypt
psycopg2-binary
confluent-kafka
apache-airflow
mlflow
mlflow
psycopg2

0 comments on commit a25ffcf

Please sign in to comment.