Skip to content

Commit

Permalink
Kafaka service integrated
Browse files Browse the repository at this point in the history
  • Loading branch information
AbYT101 committed Jun 22, 2024
1 parent a25ffcf commit 1f32971
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 13 deletions.
21 changes: 21 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
from flask_bcrypt import Bcrypt
import threading

db = SQLAlchemy()
jwt = JWTManager()
bcrypt = Bcrypt()

from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

def create_app():
app = Flask(__name__)
app.config.from_object('app.config.Config')
Expand All @@ -23,3 +27,20 @@ def create_app():
db.create_all()

return app

def consume_backtest_scenes(app):
def callback(message):
with app.app_context():
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)

# Start consuming Kafka messages in a separate thread
def start_consumer_thread(app):
consumer_thread = threading.Thread(target=consume_backtest_scenes, args=(app,))
consumer_thread.daemon = True # Allow the thread to be killed when the main program exits
consumer_thread.start()

app = create_app()
start_consumer_thread(app)
15 changes: 3 additions & 12 deletions app/routes/backtest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from flask import Blueprint, request, jsonify
from app.models.backtest import Backtest, Parameter, Result
import threading
from flask import Blueprint, request, jsonify, current_app
from app.models.backtest import Backtest, Parameter
from app import db
from flask_jwt_extended import jwt_required
from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

bp = Blueprint('backtest', __name__)


@bp.route('/backtest', methods=['POST'])
@jwt_required()
def run_backtest():
Expand Down Expand Up @@ -44,12 +44,3 @@ def run_backtest():
return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201


def consume_backtest_scenes():
def callback(message):
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)


consume_backtest_scenes() # Start consuming Kafka messages in a separate thread
27 changes: 26 additions & 1 deletion app/services/kafka_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
import json

Expand All @@ -11,27 +12,51 @@ def __init__(self, brokers):
'group.id': 'backtest_group',
'auto.offset.reset': 'earliest'
})
self.admin_client = AdminClient({'bootstrap.servers': brokers})

def create_topic(self, topic):
topic_metadata = self.admin_client.list_topics(timeout=10)
if topic not in topic_metadata.topics:
logging.info(f"Creating topic {topic}")
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
fs = self.admin_client.create_topics([new_topic])
for topic, f in fs.items():
try:
f.result() # The result itself is None
logging.info(f"Topic {topic} created successfully")
except Exception as e:
logging.error(f"Failed to create topic {topic}: {str(e)}")
else:
logging.info(f"Topic {topic} already exists")

def produce(self, topic, message):
logging.info(f"Producing message to topic {topic}: {message}")
self.producer.produce(topic, key=None, value=json.dumps(message))
self.producer.flush()
logging.info("Message produced successfully")

def consume(self, topic, callback):
self.create_topic(topic)
self.consumer.subscribe([topic])
logging.info(f"Subscribed to topic {topic}")
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
logging.debug("No message received")
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info("End of partition reached")
continue
else:
logging.error(f"Consumer error: {msg.error()}")
raise KafkaException(msg.error())
logging.info(f"Received message: {msg.value()}")
callback(json.loads(msg.value()))
except Exception as e:
logging.error(f"Error in Kafka consumer: {str(e)}")
finally:
self.consumer.close()

kafka_service = KafkaService(brokers='localhost:9092')
kafka_service = KafkaService(brokers='localhost:9092')
35 changes: 35 additions & 0 deletions tests/unit/test_backtest_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json
import pytest
from flask import Flask
from flask_testing import TestCase
from app import create_app, db
from app.models.backtest import Indicator, Backtest, Parameter, Result

class BacktestTestCase(TestCase):
def create_app(self):
app = create_app('testing') # Assume you have a testing config
return app

def setUp(self):
db.create_all()
# Add sample indicators
indicator1 = Indicator(name='Indicator 1', description='Description 1')
indicator2 = Indicator(name='Indicator 2', description='Description 2')
db.session.add(indicator1)
db.session.add(indicator2)
db.session.commit()

def tearDown(self):
db.session.remove()
db.drop_all()

def test_run_backtest(self):
with open('backtest_request.json') as f:
data = json.load(f)

response = self.client.post('/backtest', json=data)
self.assertEqual(response.status_code, 201)
self.assertIn('backtest_id', response.json)

if __name__ == '__main__':
pytest.main()

0 comments on commit 1f32971

Please sign in to comment.