This repository was archived by the owner on Aug 13, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapplication.py
More file actions
151 lines (131 loc) · 3.89 KB
/
application.py
File metadata and controls
151 lines (131 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
application.py
==============
Runs the web app.
Endpoints and socket functions are declared below.
"""
from flask import (
Flask, render_template, request, jsonify, flash,
redirect, session, url_for
)
from flask_socketio import (
SocketIO, join_room, leave_room, emit, rooms
)
import os
from werkzeug.utils import secure_filename
from src import data
from threading import Thread
app = Flask(__name__, template_folder="./templates", static_folder="./static")
app.secret_key = b"telemetry_secret_key"
app.config['UPLOAD_FOLDER'] = "/uploads"
socketio = SocketIO(app, logger=True)
rocket_data = data.DataHandler(False, is_sim=True)
update_data_thread = None
rocket_data_thread = None
@app.route("/")
def index():
"""
Directs user to the main graphing page. http://127.0.0.1:5000/
"""
return render_template(
"pages/index.html",
context={}
)
@app.route("/settings", methods=["POST"])
def change_settings():
"""
Post request handler for changing settings.
"""
global rocket_data
global update_data_thread
if request.form["data_type"] in ["Demo Data", "Explore Data"]:
if "file" in request.files:
new_file = request.files["file"]
filename = secure_filename(new_file.filename)
new_file_name = os.path.join(
os.getcwd(), app.config['UPLOAD_FOLDER'].strip("/"), filename
)
new_file.save(new_file_name)
print("Uploaded:", new_file_name)
if request.form["data_type"] == "Demo Data":
rocket_data = data.DataHandler(False, filename=new_file_name, is_sim=True)
if request.form["data_type"] == "Explore Data":
rocket_data = data.DataHandler(False, filename=new_file_name, is_sim=False)
data.should_kill_thread = True
while data.should_kill_thread and update_data_thread != None:
pass
data.should_kill_thread = False
print("Killed old thread")
update_data_thread = Thread(target=data.update_data, args=(rocket_data,))
#update_data_thread.daemon = True
update_data_thread.start()
else:
print("NO FILE!")
elif request.form["data_type"] == "Live Telemetry":
print("Starting live data")
rocket_data = data.DataHandler(True, is_sim=False)
data.should_kill_thread = True
while data.should_kill_thread and update_data_thread != None:
pass
data.should_kill_thread = False
print("Killed old thread")
update_data_thread = Thread(target=data.update_data, args=(rocket_data,))
update_data_thread.daemon = True
update_data_thread.start()
else:
pass
return ('', 204)
@socketio.on("connected")
def connect_user():
"""
Unused method for connected users.
"""
return
@socketio.on("request_data")
def request_data():
"""
Returns current data (if any) to the user that requested it.
"""
global rocket_data
d = rocket_data.get_data()
emit("receive_data", d)
@socketio.on("halt")
def request_halt():
"""
HALT state change to the rocket.
"""
rocket_data.halt()
@socketio.on("arm")
def request_arm():
"""
RESUME state change to the rocket.
"""
rocket_data.arm()
@socketio.on("eject1")
def request_arm():
"""
RESUME state change to the rocket.
"""
rocket_data.eject1()
@socketio.on("eject2")
def request_arm():
"""
RESUME state change to the rocket.
"""
rocket_data.eject2()
@socketio.on("resume")
def request_resume():
"""
RESUME state change to the rocket.
"""
print("RESUME!")
rocket_data.resume()
@socketio.on("demo_simulation")
def request_demo():
"""
DEMO command to the rocket.
"""
print("DEMO!")
rocket_data.demo_simulation()
if __name__ == "__main__":
socketio.run(app, debug=True)