-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
112 lines (87 loc) · 3.63 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
from os.path import join, dirname
from dotenv import load_dotenv
from kafka import KafkaProducer
from flask import Flask, request, jsonify
from pymongo import MongoClient
from py2neo import Graph
import json
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
app = Flask(__name__)
mongo_host = os.environ.get('MONGO_HOST')
mongo_port = os.environ.get('MONGO_PORT')
mongo_db_name = os.environ.get('MONGO_DATABASE')
mongo_collection_name = os.environ.get('MONGO_BANDS_COLLECTION')
mongo_username = os.environ.get('MONGO_INITDB_ROOT_USERNAME')
mongo_password = os.environ.get('MONGO_INITDB_ROOT_PASSWORD')
neo4j_host = os.environ.get('NEO4J_HOST')
neo4j_port = os.environ.get('NEO4J_PORT')
neo4j_username = os.environ.get('NEO4J_AUTH').split('/')[0]
neo4j_password = os.environ.get('NEO4J_AUTH').split('/')[1]
neo4j_uri = "bolt://" + neo4j_host + ':' + neo4j_port
kafka_host = os.environ.get('KAFKA_HOST')
kafka_port = os.environ.get('KAFKA_PORT')
kafka_bootstrap_servers = kafka_host + ':' + kafka_port
kafka_bands_topic = os.environ.get('KAFKA_BANDS_TOPIC')
kafka_users_topic = os.environ.get('KAFKA_USERS_TOPIC')
producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: str(v).encode('utf-8'))
try:
mongo_client = MongoClient(
f"mongodb://{mongo_username}:{mongo_password}@{mongo_host}:{mongo_port}/"
)
mongo_collection = mongo_client[mongo_db_name][mongo_collection_name]
print("Connected to MongoDB successfully!")
except Exception as e:
print(f"Error connecting to MongoDB: {e}")
try:
# Neo4j connection
graph = Graph(neo4j_uri, auth=(neo4j_username, neo4j_password))
print("Connected to Neo4j successfully!")
except Exception as e:
print(f"Error connecting to Neo4j: {e}")
try:
# Kafka connection
producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Connected to Kafka successfully!")
except Exception as e:
print(f"Error connecting to Kafka: {e}")
@app.route('/publish/bands', methods=['POST'])
def mongo_producer():
start_date = request.json['start_date']
end_date = request.json['end_date']
bands_query = {"formation_date": {"$gte": start_date, "$lte": end_date}}
bands = list(mongo_collection.find(bands_query))
for band in bands:
band["_id"] = str(band["_id"]) # Remove ObjectID type
producer.send(kafka_bands_topic, value=bands)
return jsonify({"message": "Bands published to Kafka successfully", "data": bands})
@app.route('/publish/users', methods=['POST'])
def graph_producer():
user_name = request.json['user_name']
try:
# Neo4j query to retrieve user and friends data by name
query = f"""
MATCH (u:User)-[:FRIEND]->(friend:User)
WHERE u.name = '{user_name}'
RETURN u, COLLECT(DISTINCT friend) AS friends
"""
result = graph.run(query)
for record in result:
user_data = record['u']
friends_data = record['friends']
users_data = [
{"name": user_data['name'], "favorite_bands": user_data['favorite_bands']}
] + [
{"name": friend['name'], "favorite_bands": friend['favorite_bands']} for friend in friends_data
]
producer.send(kafka_users_topic, value=users_data)
return jsonify({"message": "Users published to Kafka successfully", "data": users_data})
except Exception as e:
return jsonify({"error": f"Error processing request: {e}"})
if __name__ == '__main__':
app.run(debug=True, port=5000, host='0.0.0.0')