diff --git a/app/services/kafka_service.py b/app/services/kafka_service.py index 661b2d7..31ff063 100644 --- a/app/services/kafka_service.py +++ b/app/services/kafka_service.py @@ -1,8 +1,10 @@ -from confluent_kafka import Producer, Consumer, KafkaException +import logging +from confluent_kafka import Producer, Consumer, KafkaException, KafkaError import json class KafkaService: def __init__(self, brokers): + self.brokers = brokers self.producer = Producer({'bootstrap.servers': brokers}) self.consumer = Consumer({ 'bootstrap.servers': brokers, @@ -16,15 +18,20 @@ def produce(self, topic, message): def consume(self, topic, callback): self.consumer.subscribe([topic]) - while True: - msg = self.consumer.poll(timeout=1.0) - if msg is None: - continue - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: + try: + while True: + msg = self.consumer.poll(timeout=1.0) + if msg is None: continue - else: - raise KafkaException(msg.error()) - callback(json.loads(msg.value())) + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + raise KafkaException(msg.error()) + callback(json.loads(msg.value())) + except Exception as e: + logging.error(f"Error in Kafka consumer: {str(e)}") + finally: + self.consumer.close() -kafka_service = KafkaService(brokers='localhost:9092') +kafka_service = KafkaService(brokers='localhost:9092') \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 86a0ed4..23e5266 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ Flask-Bcrypt psycopg2-binary confluent-kafka apache-airflow -mlflow \ No newline at end of file +mlflow +psycopg2 \ No newline at end of file