diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..26d3352
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,3 @@
+# Default ignored files
+/shelf/
+/workspace.xml
diff --git a/.idea/crypto-trading-backtesting.iml b/.idea/crypto-trading-backtesting.iml
new file mode 100644
index 0000000..74d515a
--- /dev/null
+++ b/.idea/crypto-trading-backtesting.iml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..ea773d3
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..149590b
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/airflow/dags/backtest_dag.py b/airflow/dags/backtest_dag.py
index e69de29..a0b62ce 100644
--- a/airflow/dags/backtest_dag.py
+++ b/airflow/dags/backtest_dag.py
@@ -0,0 +1,29 @@
+from airflow import DAG
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime, timedelta
+from app.services.backtest_service import run_backtest_by_id
+
+default_args = {
+ 'owner': 'airflow',
+ 'depends_on_past': False,
+ 'start_date': datetime(2023, 1, 1),
+ 'retries': 1,
+ 'retry_delay': timedelta(minutes=5),
+}
+
+dag = DAG(
+ 'backtest_dag',
+ default_args=default_args,
+ description='DAG for running backtests',
+ schedule_interval=timedelta(days=1),
+)
+
+def run_backtest(task_id, *args, **kwargs):
+ run_backtest_by_id(task_id)
+
+run_backtest_task = PythonOperator(
+ task_id='run_backtest',
+ python_callable=run_backtest,
+ op_args=['{{ task_instance.task_id }}'],
+ dag=dag,
+)
diff --git a/app/config.py b/app/config.py
index 052c297..f91c16a 100644
--- a/app/config.py
+++ b/app/config.py
@@ -1,4 +1,4 @@
class Config:
- SQLALCHEMY_DATABASE_URI = 'db_utl'
+ SQLALCHEMY_DATABASE_URI = 'postgresql://test_user:password@localhost/test_db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = 'your_secret_key'
diff --git a/app/routes/backtest.py b/app/routes/backtest.py
index 4eca315..2ff5b70 100644
--- a/app/routes/backtest.py
+++ b/app/routes/backtest.py
@@ -2,6 +2,8 @@
from app.models.backtest import Backtest, Parameter, Result
from app import db
from flask_jwt_extended import jwt_required
+from app.services.backtest_service import run_backtest_by_id
+from app.services.kafka_service import kafka_service
bp = Blueprint('backtest', __name__)
@@ -33,24 +35,21 @@ def run_backtest():
db.session.commit()
- # Run the backtest (dummy processing here)
- result = run_backtest_logic(new_backtest.id)
+ # Publish backtest to Kafka for processing
+ kafka_service.produce('backtest_scenes', {
+ "backtest_id": new_backtest.id,
+ "parameters": parameters
+ })
- return jsonify(result), 201
+ return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201
-def run_backtest_logic(backtest_id):
- # Dummy backtest logic
- result = Result(
- backtest_id=backtest_id,
- total_return=10.5,
- number_of_trades=20,
- winning_trades=15,
- losing_trades=5,
- max_drawdown=3.5,
- sharpe_ratio=1.8
- )
- db.session.add(result)
- db.session.commit()
+def consume_backtest_scenes():
+ def callback(message):
+ backtest_id = message.get('backtest_id')
+ run_backtest_by_id(backtest_id)
+
+ kafka_service.consume('backtest_scenes', callback)
+
- return {"msg": "Backtest completed", "backtest_id": backtest_id, "result": result.id}
+consume_backtest_scenes() # Start consuming Kafka messages in a separate thread
diff --git a/app/services/backtest_service.py b/app/services/backtest_service.py
index e69de29..4288aea 100644
--- a/app/services/backtest_service.py
+++ b/app/services/backtest_service.py
@@ -0,0 +1,39 @@
+from app.models.backtest import Backtest, Result
+from app import db
+from app.services.kafka_service import kafka_service
+from app.services.mlflow_service import mlflow_service
+
+def run_backtest_by_id(backtest_id):
+ backtest = Backtest.query.get(backtest_id)
+ if not backtest:
+ return
+
+ # Simulate backtest processing
+ result = Result(
+ backtest_id=backtest_id,
+ total_return=10.5,
+ number_of_trades=20,
+ winning_trades=15,
+ losing_trades=5,
+ max_drawdown=3.5,
+ sharpe_ratio=1.8
+ )
+ db.session.add(result)
+ db.session.commit()
+
+ # Log metrics to MLflow
+ metrics = {
+ "total_return": result.total_return,
+ "number_of_trades": result.number_of_trades,
+ "winning_trades": result.winning_trades,
+ "losing_trades": result.losing_trades,
+ "max_drawdown": result.max_drawdown,
+ "sharpe_ratio": result.sharpe_ratio
+ }
+ mlflow_service.log_metrics(run_name=f"Backtest_{backtest_id}", metrics=metrics)
+
+ # Publish result to Kafka
+ kafka_service.produce('backtest_results', {
+ "backtest_id": backtest_id,
+ "metrics": metrics
+ })
diff --git a/app/services/kafka_service.py b/app/services/kafka_service.py
index e69de29..661b2d7 100644
--- a/app/services/kafka_service.py
+++ b/app/services/kafka_service.py
@@ -0,0 +1,30 @@
+from confluent_kafka import Producer, Consumer, KafkaException
+import json
+
+class KafkaService:
+ def __init__(self, brokers):
+ self.producer = Producer({'bootstrap.servers': brokers})
+ self.consumer = Consumer({
+ 'bootstrap.servers': brokers,
+ 'group.id': 'backtest_group',
+ 'auto.offset.reset': 'earliest'
+ })
+
+ def produce(self, topic, message):
+ self.producer.produce(topic, key=None, value=json.dumps(message))
+ self.producer.flush()
+
+ def consume(self, topic, callback):
+ self.consumer.subscribe([topic])
+ while True:
+ msg = self.consumer.poll(timeout=1.0)
+ if msg is None:
+ continue
+ if msg.error():
+ if msg.error().code() == KafkaError._PARTITION_EOF:
+ continue
+ else:
+ raise KafkaException(msg.error())
+ callback(json.loads(msg.value()))
+
+kafka_service = KafkaService(brokers='localhost:9092')
diff --git a/app/services/mlflow_service.py b/app/services/mlflow_service.py
index e69de29..c36088a 100644
--- a/app/services/mlflow_service.py
+++ b/app/services/mlflow_service.py
@@ -0,0 +1,13 @@
+import mlflow
+import mlflow.sklearn
+
+class MLflowService:
+ def __init__(self, tracking_uri):
+ mlflow.set_tracking_uri(tracking_uri)
+
+ def log_metrics(self, run_name, metrics):
+ with mlflow.start_run(run_name=run_name):
+ for key, value in metrics.items():
+ mlflow.log_metric(key, value)
+
+mlflow_service = MLflowService(tracking_uri='http://localhost:5000')
diff --git a/requirements.txt b/requirements.txt
index ae2607a..86a0ed4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,4 +2,7 @@ flask
Flask-SQLAlchemy
Flask-JWT-Extended
Flask-Bcrypt
-psycopg2-binary
\ No newline at end of file
+psycopg2-binary
+confluent-kafka
+apache-airflow
+mlflow
\ No newline at end of file