-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
- Loading branch information
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from airflow import DAG | ||
from airflow.operators.python_operator import PythonOperator | ||
from datetime import datetime, timedelta | ||
from app.services.backtest_service import run_backtest_by_id | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'depends_on_past': False, | ||
'start_date': datetime(2023, 1, 1), | ||
'retries': 1, | ||
'retry_delay': timedelta(minutes=5), | ||
} | ||
|
||
dag = DAG( | ||
'backtest_dag', | ||
default_args=default_args, | ||
description='DAG for running backtests', | ||
schedule_interval=timedelta(days=1), | ||
) | ||
|
||
def run_backtest(task_id, *args, **kwargs): | ||
run_backtest_by_id(task_id) | ||
|
||
run_backtest_task = PythonOperator( | ||
task_id='run_backtest', | ||
python_callable=run_backtest, | ||
op_args=['{{ task_instance.task_id }}'], | ||
dag=dag, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from flask import Flask | ||
from flask_sqlalchemy import SQLAlchemy | ||
from flask_jwt_extended import JWTManager | ||
from flask_bcrypt import Bcrypt | ||
|
||
db = SQLAlchemy() | ||
jwt = JWTManager() | ||
bcrypt = Bcrypt() | ||
|
||
def create_app(): | ||
app = Flask(__name__) | ||
app.config.from_object('app.config.Config') | ||
|
||
db.init_app(app) | ||
jwt.init_app(app) | ||
bcrypt.init_app(app) | ||
|
||
with app.app_context(): | ||
from app.routes import auth, backtest, index | ||
app.register_blueprint(auth.bp) | ||
app.register_blueprint(backtest.bp) | ||
app.register_blueprint(index.bp) | ||
db.create_all() | ||
|
||
return app |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
class Config: | ||
SQLALCHEMY_DATABASE_URI = 'postgresql://test_user:password@localhost/test_db' | ||
SQLALCHEMY_TRACK_MODIFICATIONS = False | ||
JWT_SECRET_KEY = 'your_secret_key' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from app import db | ||
from app.models import backtest, user |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from app import create_app | ||
|
||
app = create_app() | ||
|
||
if __name__ == "__main__": | ||
app.run(debug=True) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from app import db | ||
|
||
class Backtest(db.Model): | ||
__tablename__ = 'backtests' | ||
id = db.Column(db.Integer, primary_key=True) | ||
name = db.Column(db.String(255)) | ||
start_date = db.Column(db.Date) | ||
end_date = db.Column(db.Date) | ||
status = db.Column(db.String(50)) | ||
created_at = db.Column(db.DateTime, default=db.func.current_timestamp()) | ||
|
||
class Indicator(db.Model): | ||
__tablename__ = 'indicators' | ||
id = db.Column(db.Integer, primary_key=True) | ||
name = db.Column(db.String(255)) | ||
description = db.Column(db.Text) | ||
|
||
class Parameter(db.Model): | ||
__tablename__ = 'parameters' | ||
id = db.Column(db.Integer, primary_key=True) | ||
backtest_id = db.Column(db.Integer, db.ForeignKey('backtests.id'), nullable=False) | ||
indicator_id = db.Column(db.Integer, db.ForeignKey('indicators.id'), nullable=False) | ||
value = db.Column(db.String(255)) | ||
|
||
class Result(db.Model): | ||
__tablename__ = 'results' | ||
id = db.Column(db.Integer, primary_key=True) | ||
backtest_id = db.Column(db.Integer, db.ForeignKey('backtests.id'), nullable=False) | ||
total_return = db.Column(db.Numeric(10, 2)) | ||
number_of_trades = db.Column(db.Integer) | ||
winning_trades = db.Column(db.Integer) | ||
losing_trades = db.Column(db.Integer) | ||
max_drawdown = db.Column(db.Numeric(10, 2)) | ||
sharpe_ratio = db.Column(db.Numeric(10, 2)) | ||
|
||
class Metric(db.Model): | ||
__tablename__ = 'metrics' | ||
id = db.Column(db.Integer, primary_key=True) | ||
backtest_id = db.Column(db.Integer, db.ForeignKey('backtests.id'), nullable=False) | ||
metric_name = db.Column(db.String(255)) | ||
metric_value = db.Column(db.Numeric(10, 2)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from app import db | ||
|
||
class User(db.Model): | ||
__tablename__ = 'users' | ||
id = db.Column(db.Integer, primary_key=True) | ||
username = db.Column(db.String(255), unique=True, nullable=False) | ||
password = db.Column(db.String(255), nullable=False) | ||
created_at = db.Column(db.DateTime, default=db.func.current_timestamp()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from flask import Blueprint, request, jsonify | ||
from app.models.user import User | ||
from app import db, bcrypt, jwt | ||
from flask_jwt_extended import create_access_token | ||
|
||
bp = Blueprint('auth', __name__) | ||
|
||
|
||
@bp.route('/register', methods=['POST']) | ||
def register(): | ||
data = request.get_json() | ||
username = data.get('username') | ||
password = data.get('password') | ||
|
||
if User.query.filter_by(username=username).first(): | ||
return jsonify({"msg": "Username already exists"}), 409 | ||
|
||
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8') | ||
new_user = User(username=username, password=hashed_password) | ||
db.session.add(new_user) | ||
db.session.commit() | ||
|
||
return jsonify({"msg": "User registered successfully"}), 201 | ||
|
||
|
||
@bp.route('/login', methods=['POST']) | ||
def login(): | ||
data = request.get_json() | ||
username = data.get('username') | ||
password = data.get('password') | ||
|
||
user = User.query.filter_by(username=username).first() | ||
|
||
if user and bcrypt.check_password_hash(user.password, password): | ||
access_token = create_access_token(identity={'username': user.username}) | ||
return jsonify(access_token=access_token), 200 | ||
|
||
return jsonify({"msg": "Bad username or password"}), 401 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from flask import Blueprint, request, jsonify | ||
from app.models.backtest import Backtest, Parameter, Result | ||
from app import db | ||
from flask_jwt_extended import jwt_required | ||
from app.services.backtest_service import run_backtest_by_id | ||
from app.services.kafka_service import kafka_service | ||
|
||
bp = Blueprint('backtest', __name__) | ||
|
||
|
||
@bp.route('/backtest', methods=['POST']) | ||
@jwt_required() | ||
def run_backtest(): | ||
data = request.get_json() | ||
name = data.get('name') | ||
start_date = data.get('start_date') | ||
end_date = data.get('end_date') | ||
parameters = data.get('parameters') | ||
|
||
# Check if backtest with same parameters exists | ||
existing_backtest = Backtest.query.filter_by(name=name, start_date=start_date, end_date=end_date).first() | ||
if existing_backtest: | ||
return jsonify( | ||
{"msg": "Backtest with same parameters already exists", "backtest_id": existing_backtest.id}), 200 | ||
|
||
# Create new backtest | ||
new_backtest = Backtest(name=name, start_date=start_date, end_date=end_date, status="pending") | ||
db.session.add(new_backtest) | ||
db.session.commit() | ||
|
||
# Add parameters | ||
for param in parameters: | ||
new_param = Parameter(backtest_id=new_backtest.id, indicator_id=param['indicator_id'], value=param['value']) | ||
db.session.add(new_param) | ||
|
||
db.session.commit() | ||
|
||
# Publish backtest to Kafka for processing | ||
kafka_service.produce('backtest_scenes', { | ||
"backtest_id": new_backtest.id, | ||
"parameters": parameters | ||
}) | ||
|
||
return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201 | ||
|
||
|
||
def consume_backtest_scenes(): | ||
def callback(message): | ||
backtest_id = message.get('backtest_id') | ||
run_backtest_by_id(backtest_id) | ||
|
||
kafka_service.consume('backtest_scenes', callback) | ||
|
||
|
||
consume_backtest_scenes() # Start consuming Kafka messages in a separate thread |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from flask import Blueprint | ||
|
||
bp = Blueprint('index', __name__) | ||
|
||
@bp.route('/') | ||
def index(): | ||
return "Welcome to the Backtest API!" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from app.models.backtest import Backtest, Result | ||
from app import db | ||
from app.services.kafka_service import kafka_service | ||
from app.services.mlflow_service import mlflow_service | ||
|
||
def run_backtest_by_id(backtest_id): | ||
backtest = Backtest.query.get(backtest_id) | ||
if not backtest: | ||
return | ||
|
||
# Simulate backtest processing | ||
result = Result( | ||
backtest_id=backtest_id, | ||
total_return=10.5, | ||
number_of_trades=20, | ||
winning_trades=15, | ||
losing_trades=5, | ||
max_drawdown=3.5, | ||
sharpe_ratio=1.8 | ||
) | ||
db.session.add(result) | ||
db.session.commit() | ||
|
||
# Log metrics to MLflow | ||
metrics = { | ||
"total_return": result.total_return, | ||
"number_of_trades": result.number_of_trades, | ||
"winning_trades": result.winning_trades, | ||
"losing_trades": result.losing_trades, | ||
"max_drawdown": result.max_drawdown, | ||
"sharpe_ratio": result.sharpe_ratio | ||
} | ||
mlflow_service.log_metrics(run_name=f"Backtest_{backtest_id}", metrics=metrics) | ||
|
||
# Publish result to Kafka | ||
kafka_service.produce('backtest_results', { | ||
"backtest_id": backtest_id, | ||
"metrics": metrics | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from confluent_kafka import Producer, Consumer, KafkaException | ||
import json | ||
|
||
class KafkaService: | ||
def __init__(self, brokers): | ||
self.producer = Producer({'bootstrap.servers': brokers}) | ||
self.consumer = Consumer({ | ||
'bootstrap.servers': brokers, | ||
'group.id': 'backtest_group', | ||
'auto.offset.reset': 'earliest' | ||
}) | ||
|
||
def produce(self, topic, message): | ||
self.producer.produce(topic, key=None, value=json.dumps(message)) | ||
self.producer.flush() | ||
|
||
def consume(self, topic, callback): | ||
self.consumer.subscribe([topic]) | ||
while True: | ||
msg = self.consumer.poll(timeout=1.0) | ||
if msg is None: | ||
continue | ||
if msg.error(): | ||
if msg.error().code() == KafkaError._PARTITION_EOF: | ||
continue | ||
else: | ||
raise KafkaException(msg.error()) | ||
callback(json.loads(msg.value())) | ||
|
||
kafka_service = KafkaService(brokers='localhost:9092') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import mlflow | ||
import mlflow.sklearn | ||
|
||
class MLflowService: | ||
def __init__(self, tracking_uri): | ||
mlflow.set_tracking_uri(tracking_uri) | ||
|
||
def log_metrics(self, run_name, metrics): | ||
with mlflow.start_run(run_name=run_name): | ||
for key, value in metrics.items(): | ||
mlflow.log_metric(key, value) | ||
|
||
mlflow_service = MLflowService(tracking_uri='http://localhost:5000') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
flask | ||
Flask-SQLAlchemy | ||
Flask-JWT-Extended | ||
Flask-Bcrypt | ||
psycopg2-binary | ||
confluent-kafka | ||
apache-airflow | ||
mlflow |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from app import create_app | ||
|
||
app = create_app() | ||
|
||
if __name__ == "__main__": | ||
app.run(debug=True) |