diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/crypto-trading-backtesting.iml b/.idea/crypto-trading-backtesting.iml new file mode 100644 index 0000000..74d515a --- /dev/null +++ b/.idea/crypto-trading-backtesting.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..ea773d3 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,10 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..149590b --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/airflow/dags/backtest_dag.py b/airflow/dags/backtest_dag.py index e69de29..a0b62ce 100644 --- a/airflow/dags/backtest_dag.py +++ b/airflow/dags/backtest_dag.py @@ -0,0 +1,29 @@ +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from datetime import datetime, timedelta +from app.services.backtest_service import run_backtest_by_id + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'backtest_dag', + default_args=default_args, + description='DAG for running backtests', + schedule_interval=timedelta(days=1), +) + +def run_backtest(task_id, *args, **kwargs): + run_backtest_by_id(task_id) + +run_backtest_task = PythonOperator( + task_id='run_backtest', + python_callable=run_backtest, + op_args=['{{ task_instance.task_id }}'], + dag=dag, +) diff --git a/app/config.py b/app/config.py index 052c297..f91c16a 100644 --- a/app/config.py +++ b/app/config.py @@ -1,4 +1,4 @@ class Config: - SQLALCHEMY_DATABASE_URI = 'db_utl' + SQLALCHEMY_DATABASE_URI = 'postgresql://test_user:password@localhost/test_db' SQLALCHEMY_TRACK_MODIFICATIONS = False JWT_SECRET_KEY = 'your_secret_key' diff --git a/app/routes/backtest.py b/app/routes/backtest.py index 4eca315..2ff5b70 100644 --- a/app/routes/backtest.py +++ b/app/routes/backtest.py @@ -2,6 +2,8 @@ from app.models.backtest import Backtest, Parameter, Result from app import db from flask_jwt_extended import jwt_required +from app.services.backtest_service import run_backtest_by_id +from app.services.kafka_service import kafka_service bp = Blueprint('backtest', __name__) @@ -33,24 +35,21 @@ def run_backtest(): db.session.commit() - # Run the backtest (dummy processing here) - result = run_backtest_logic(new_backtest.id) + # Publish backtest to Kafka for processing + kafka_service.produce('backtest_scenes', { + "backtest_id": new_backtest.id, + "parameters": parameters + }) - return jsonify(result), 201 + return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201 -def run_backtest_logic(backtest_id): - # Dummy backtest logic - result = Result( - backtest_id=backtest_id, - total_return=10.5, - number_of_trades=20, - winning_trades=15, - losing_trades=5, - max_drawdown=3.5, - sharpe_ratio=1.8 - ) - db.session.add(result) - db.session.commit() +def consume_backtest_scenes(): + def callback(message): + backtest_id = message.get('backtest_id') + run_backtest_by_id(backtest_id) + + kafka_service.consume('backtest_scenes', callback) + - return {"msg": "Backtest completed", "backtest_id": backtest_id, "result": result.id} +consume_backtest_scenes() # Start consuming Kafka messages in a separate thread diff --git a/app/services/backtest_service.py b/app/services/backtest_service.py index e69de29..4288aea 100644 --- a/app/services/backtest_service.py +++ b/app/services/backtest_service.py @@ -0,0 +1,39 @@ +from app.models.backtest import Backtest, Result +from app import db +from app.services.kafka_service import kafka_service +from app.services.mlflow_service import mlflow_service + +def run_backtest_by_id(backtest_id): + backtest = Backtest.query.get(backtest_id) + if not backtest: + return + + # Simulate backtest processing + result = Result( + backtest_id=backtest_id, + total_return=10.5, + number_of_trades=20, + winning_trades=15, + losing_trades=5, + max_drawdown=3.5, + sharpe_ratio=1.8 + ) + db.session.add(result) + db.session.commit() + + # Log metrics to MLflow + metrics = { + "total_return": result.total_return, + "number_of_trades": result.number_of_trades, + "winning_trades": result.winning_trades, + "losing_trades": result.losing_trades, + "max_drawdown": result.max_drawdown, + "sharpe_ratio": result.sharpe_ratio + } + mlflow_service.log_metrics(run_name=f"Backtest_{backtest_id}", metrics=metrics) + + # Publish result to Kafka + kafka_service.produce('backtest_results', { + "backtest_id": backtest_id, + "metrics": metrics + }) diff --git a/app/services/kafka_service.py b/app/services/kafka_service.py index e69de29..661b2d7 100644 --- a/app/services/kafka_service.py +++ b/app/services/kafka_service.py @@ -0,0 +1,30 @@ +from confluent_kafka import Producer, Consumer, KafkaException +import json + +class KafkaService: + def __init__(self, brokers): + self.producer = Producer({'bootstrap.servers': brokers}) + self.consumer = Consumer({ + 'bootstrap.servers': brokers, + 'group.id': 'backtest_group', + 'auto.offset.reset': 'earliest' + }) + + def produce(self, topic, message): + self.producer.produce(topic, key=None, value=json.dumps(message)) + self.producer.flush() + + def consume(self, topic, callback): + self.consumer.subscribe([topic]) + while True: + msg = self.consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + raise KafkaException(msg.error()) + callback(json.loads(msg.value())) + +kafka_service = KafkaService(brokers='localhost:9092') diff --git a/app/services/mlflow_service.py b/app/services/mlflow_service.py index e69de29..c36088a 100644 --- a/app/services/mlflow_service.py +++ b/app/services/mlflow_service.py @@ -0,0 +1,13 @@ +import mlflow +import mlflow.sklearn + +class MLflowService: + def __init__(self, tracking_uri): + mlflow.set_tracking_uri(tracking_uri) + + def log_metrics(self, run_name, metrics): + with mlflow.start_run(run_name=run_name): + for key, value in metrics.items(): + mlflow.log_metric(key, value) + +mlflow_service = MLflowService(tracking_uri='http://localhost:5000') diff --git a/requirements.txt b/requirements.txt index ae2607a..86a0ed4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,7 @@ flask Flask-SQLAlchemy Flask-JWT-Extended Flask-Bcrypt -psycopg2-binary \ No newline at end of file +psycopg2-binary +confluent-kafka +apache-airflow +mlflow \ No newline at end of file