1
1
import atexit
2
+ import multiprocessing
2
3
import queue
3
4
from collections import defaultdict
4
5
from functools import lru_cache
5
6
from multiprocessing import Queue , Process
6
7
from threading import Thread
8
+ from time import sleep
9
+
10
+ import flask
7
11
8
12
from main import simple_settings
9
- from main .workers .communications import AbstractCommunications , ThreadCommunications
10
13
from main .workers .utils import internal_error_result , make_result
11
14
from main .workers .worker import worker_loop_in_thread
12
15
@@ -48,23 +51,23 @@ def handle_entry(self, entry):
48
51
49
52
self .task_queue .put (entry )
50
53
51
- def await_result (self , callback ):
52
- try :
53
- result = self ._await_result ()
54
- # if result["error"] and result["error"]["sentry_event"]:
55
- # event, hint = result["error"]["sentry_event"]
56
- # capture_event(event, hint)
57
- except Exception :
58
- result = internal_error_result ()
54
+ def await_result (self ):
55
+ result = self ._await_result ()
56
+ # if result["error"] and result["error"]["sentry_event"]:
57
+ # event, hint = result["error"]["sentry_event"]
58
+ # capture_event(event, hint)
59
59
self .awaiting_input = result ["awaiting_input" ]
60
- callback ( result )
60
+ return result
61
61
62
62
def _await_result (self ):
63
63
# TODO cancel if result was cancelled by a newer handle_entry
64
64
result = None
65
+ # TODO handle initial timeout better
66
+ timeout = 10
65
67
while result is None :
66
68
try :
67
- result = self .result_queue .get (timeout = 3 )
69
+ result = self .result_queue .get (timeout = timeout )
70
+ timeout = 3
68
71
except queue .Empty :
69
72
alive = self .process .is_alive ()
70
73
print (f"Process { alive = } " )
@@ -82,59 +85,63 @@ def _await_result(self):
82
85
return result
83
86
84
87
85
- def master_consumer_loop (comms : AbstractCommunications ):
86
- comms = comms .make_master_side_communications ()
87
- user_processes = defaultdict (UserProcess )
88
+ user_processes = defaultdict (UserProcess )
89
+
90
+ app = flask .Flask (__name__ )
91
+
92
+ multiprocessing .set_start_method ("spawn" )
93
+
94
+
95
+ @app .route ("/run" , methods = ["POST" ])
96
+ def run ():
97
+ try :
98
+ entry = flask .request .json
99
+ user_process = user_processes [entry ["user_id" ]]
100
+ user_process .handle_entry (entry )
101
+ return user_process .await_result ()
102
+ except Exception :
103
+ return internal_error_result ()
104
+
88
105
89
- while True :
90
- entry = comms . recv_entry ()
91
- user_id = str ( entry [ "user_id" ])
106
+ @ app . route ( "/health" )
107
+ def health ():
108
+ return "ok"
92
109
93
- def callback (result ):
94
- comms .send_result (user_id , result )
95
110
96
- try :
97
- user_process = user_processes [user_id ]
98
- user_process .handle_entry (entry )
99
- Thread (
100
- target = user_process .await_result ,
101
- args = [callback ],
102
- ).start ()
103
- except Exception :
104
- callback (internal_error_result ())
111
+ def run_server ():
112
+ app .run (host = "0.0.0.0" )
113
+
114
+
115
+ master_url = "http://localhost:5000/"
105
116
106
117
107
118
@lru_cache ()
108
- def master_communications () -> AbstractCommunications :
109
- if simple_settings .CLOUDAMQP_URL :
110
- from .pika import PikaCommunications
111
- comms = PikaCommunications ()
112
- else :
113
- comms = ThreadCommunications ()
119
+ def master_session ():
120
+ import requests
121
+ session = requests .Session ()
114
122
115
123
if not simple_settings .SEPARATE_WORKER_PROCESS :
116
124
Thread (
117
- target = master_consumer_loop ,
118
- args = [comms ],
125
+ target = run_server ,
119
126
daemon = True ,
120
- name = master_consumer_loop .__name__ ,
127
+ name = run_server .__name__ ,
121
128
).start ()
122
129
123
- return comms
124
-
130
+ # Wait until alive
131
+ while True :
132
+ try :
133
+ session .get (master_url + "health" )
134
+ break
135
+ except requests .exceptions .ConnectionError :
136
+ sleep (1 )
125
137
126
- def worker_result (entry ):
127
- comms : AbstractCommunications = master_communications ()
128
- comms .send_entry (entry )
129
- user_id = str (entry ["user_id" ])
130
- return comms .recv_result (user_id )
138
+ return session
131
139
132
140
133
- def main ():
134
- from main .workers .pika import PikaCommunications
135
- comms = PikaCommunications ()
136
- master_consumer_loop (comms )
141
+ def worker_result (entry ):
142
+ session = master_session ()
143
+ return session .post (master_url + "run" , json = entry ).json ()
137
144
138
145
139
146
if __name__ == '__main__' :
140
- main ()
147
+ run_server ()
0 commit comments