Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
temesgen5335 committed Jun 21, 2024
2 parents 791fd46 + b1e519c commit 9ae7694
Show file tree
Hide file tree
Showing 23 changed files with 916 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions .idea/crypto-trading-backtesting.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions airflow/dags/backtest_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from app.services.backtest_service import run_backtest_by_id

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'backtest_dag',
default_args=default_args,
description='DAG for running backtests',
schedule_interval=timedelta(days=1),
)

def run_backtest(task_id, *args, **kwargs):
run_backtest_by_id(task_id)

run_backtest_task = PythonOperator(
task_id='run_backtest',
python_callable=run_backtest,
op_args=['{{ task_instance.task_id }}'],
dag=dag,
)
25 changes: 25 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
from flask_bcrypt import Bcrypt

db = SQLAlchemy()
jwt = JWTManager()
bcrypt = Bcrypt()

def create_app():
app = Flask(__name__)
app.config.from_object('app.config.Config')

db.init_app(app)
jwt.init_app(app)
bcrypt.init_app(app)

with app.app_context():
from app.routes import auth, backtest, index
app.register_blueprint(auth.bp)
app.register_blueprint(backtest.bp)
app.register_blueprint(index.bp)
db.create_all()

return app
4 changes: 4 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class Config:
SQLALCHEMY_DATABASE_URI = 'postgresql://test_user:password@localhost/test_db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = 'your_secret_key'
2 changes: 2 additions & 0 deletions app/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from app import db
from app.models import backtest, user
6 changes: 6 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from app import create_app

app = create_app()

if __name__ == "__main__":
app.run(debug=True)
41 changes: 41 additions & 0 deletions app/models/backtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from app import db

class Backtest(db.Model):
__tablename__ = 'backtests'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
start_date = db.Column(db.Date)
end_date = db.Column(db.Date)
status = db.Column(db.String(50))
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())

class Indicator(db.Model):
__tablename__ = 'indicators'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
description = db.Column(db.Text)

class Parameter(db.Model):
__tablename__ = 'parameters'
id = db.Column(db.Integer, primary_key=True)
backtest_id = db.Column(db.Integer, db.ForeignKey('backtests.id'), nullable=False)
indicator_id = db.Column(db.Integer, db.ForeignKey('indicators.id'), nullable=False)
value = db.Column(db.String(255))

class Result(db.Model):
__tablename__ = 'results'
id = db.Column(db.Integer, primary_key=True)
backtest_id = db.Column(db.Integer, db.ForeignKey('backtests.id'), nullable=False)
total_return = db.Column(db.Numeric(10, 2))
number_of_trades = db.Column(db.Integer)
winning_trades = db.Column(db.Integer)
losing_trades = db.Column(db.Integer)
max_drawdown = db.Column(db.Numeric(10, 2))
sharpe_ratio = db.Column(db.Numeric(10, 2))

class Metric(db.Model):
__tablename__ = 'metrics'
id = db.Column(db.Integer, primary_key=True)
backtest_id = db.Column(db.Integer, db.ForeignKey('backtests.id'), nullable=False)
metric_name = db.Column(db.String(255))
metric_value = db.Column(db.Numeric(10, 2))
8 changes: 8 additions & 0 deletions app/models/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from app import db

class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
38 changes: 38 additions & 0 deletions app/routes/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from flask import Blueprint, request, jsonify
from app.models.user import User
from app import db, bcrypt, jwt
from flask_jwt_extended import create_access_token

bp = Blueprint('auth', __name__)


@bp.route('/register', methods=['POST'])
def register():
data = request.get_json()
username = data.get('username')
password = data.get('password')

if User.query.filter_by(username=username).first():
return jsonify({"msg": "Username already exists"}), 409

hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
new_user = User(username=username, password=hashed_password)
db.session.add(new_user)
db.session.commit()

return jsonify({"msg": "User registered successfully"}), 201


@bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')

user = User.query.filter_by(username=username).first()

if user and bcrypt.check_password_hash(user.password, password):
access_token = create_access_token(identity={'username': user.username})
return jsonify(access_token=access_token), 200

return jsonify({"msg": "Bad username or password"}), 401
55 changes: 55 additions & 0 deletions app/routes/backtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from flask import Blueprint, request, jsonify
from app.models.backtest import Backtest, Parameter, Result
from app import db
from flask_jwt_extended import jwt_required
from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

bp = Blueprint('backtest', __name__)


@bp.route('/backtest', methods=['POST'])
@jwt_required()
def run_backtest():
data = request.get_json()
name = data.get('name')
start_date = data.get('start_date')
end_date = data.get('end_date')
parameters = data.get('parameters')

# Check if backtest with same parameters exists
existing_backtest = Backtest.query.filter_by(name=name, start_date=start_date, end_date=end_date).first()
if existing_backtest:
return jsonify(
{"msg": "Backtest with same parameters already exists", "backtest_id": existing_backtest.id}), 200

# Create new backtest
new_backtest = Backtest(name=name, start_date=start_date, end_date=end_date, status="pending")
db.session.add(new_backtest)
db.session.commit()

# Add parameters
for param in parameters:
new_param = Parameter(backtest_id=new_backtest.id, indicator_id=param['indicator_id'], value=param['value'])
db.session.add(new_param)

db.session.commit()

# Publish backtest to Kafka for processing
kafka_service.produce('backtest_scenes', {
"backtest_id": new_backtest.id,
"parameters": parameters
})

return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201


def consume_backtest_scenes():
def callback(message):
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)


consume_backtest_scenes() # Start consuming Kafka messages in a separate thread
7 changes: 7 additions & 0 deletions app/routes/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from flask import Blueprint

bp = Blueprint('index', __name__)

@bp.route('/')
def index():
return "Welcome to the Backtest API!"
39 changes: 39 additions & 0 deletions app/services/backtest_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from app.models.backtest import Backtest, Result
from app import db
from app.services.kafka_service import kafka_service
from app.services.mlflow_service import mlflow_service

def run_backtest_by_id(backtest_id):
backtest = Backtest.query.get(backtest_id)
if not backtest:
return

# Simulate backtest processing
result = Result(
backtest_id=backtest_id,
total_return=10.5,
number_of_trades=20,
winning_trades=15,
losing_trades=5,
max_drawdown=3.5,
sharpe_ratio=1.8
)
db.session.add(result)
db.session.commit()

# Log metrics to MLflow
metrics = {
"total_return": result.total_return,
"number_of_trades": result.number_of_trades,
"winning_trades": result.winning_trades,
"losing_trades": result.losing_trades,
"max_drawdown": result.max_drawdown,
"sharpe_ratio": result.sharpe_ratio
}
mlflow_service.log_metrics(run_name=f"Backtest_{backtest_id}", metrics=metrics)

# Publish result to Kafka
kafka_service.produce('backtest_results', {
"backtest_id": backtest_id,
"metrics": metrics
})
30 changes: 30 additions & 0 deletions app/services/kafka_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from confluent_kafka import Producer, Consumer, KafkaException
import json

class KafkaService:
def __init__(self, brokers):
self.producer = Producer({'bootstrap.servers': brokers})
self.consumer = Consumer({
'bootstrap.servers': brokers,
'group.id': 'backtest_group',
'auto.offset.reset': 'earliest'
})

def produce(self, topic, message):
self.producer.produce(topic, key=None, value=json.dumps(message))
self.producer.flush()

def consume(self, topic, callback):
self.consumer.subscribe([topic])
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
callback(json.loads(msg.value()))

kafka_service = KafkaService(brokers='localhost:9092')
13 changes: 13 additions & 0 deletions app/services/mlflow_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import mlflow
import mlflow.sklearn

class MLflowService:
def __init__(self, tracking_uri):
mlflow.set_tracking_uri(tracking_uri)

def log_metrics(self, run_name, metrics):
with mlflow.start_run(run_name=run_name):
for key, value in metrics.items():
mlflow.log_metric(key, value)

mlflow_service = MLflowService(tracking_uri='http://localhost:5000')
Loading

0 comments on commit 9ae7694

Please sign in to comment.