9
9
import resource
10
10
import sys
11
11
from code import InteractiveConsole
12
+ from collections import defaultdict
12
13
from datetime import datetime
13
14
from functools import lru_cache
14
15
from importlib import import_module
15
- from multiprocessing import Pipe
16
- from multiprocessing .connection import Connection
17
16
from multiprocessing .context import Process
18
17
from threading import Thread
18
+ from time import sleep
19
19
20
20
import birdseye .bird
21
21
import snoop
26
26
27
27
from main .text import pages
28
28
from main .utils import format_exception_string , rows_to_dicts
29
+ from main .workers .communications import AbstractCommunications , ThreadCommunications
29
30
30
31
log = logging .getLogger (__name__ )
31
32
@@ -288,57 +289,53 @@ def readline():
288
289
)
289
290
290
291
291
- def consumer (connection : Connection ):
292
- manager = multiprocessing .Manager ()
293
- task_queue = manager .Queue ()
294
- input_queue = manager .Queue ()
295
- result_queue = manager .Queue ()
296
- process = None
297
-
298
- def start_process ():
299
- nonlocal process
300
- process = Process (
292
+ class UserProcess :
293
+ def __init__ (self , manager ):
294
+ self .task_queue = manager .Queue ()
295
+ self .input_queue = manager .Queue ()
296
+ self .result_queue = manager .Queue ()
297
+ self .awaiting_input = False
298
+ self .process = None
299
+ self .start_process ()
300
+
301
+ @atexit .register
302
+ def cleanup ():
303
+ if self .process :
304
+ self .process .terminate ()
305
+
306
+ def start_process (self ):
307
+ self .process = Process (
301
308
target = worker_loop_in_thread ,
302
- args = (task_queue , input_queue , result_queue ),
309
+ args = (self . task_queue , self . input_queue , self . result_queue ),
303
310
daemon = True ,
304
311
)
305
- process .start ()
306
-
307
- start_process ()
308
-
309
- def cleanup ():
310
- process .terminate ()
311
-
312
- atexit .register (cleanup )
312
+ self .process .start ()
313
313
314
- awaiting_input = False
315
-
316
- def run ():
317
- task_queue .put (entry )
318
-
319
- while True :
320
- entry = connection .recv ()
314
+ def handle_entry (self , entry ):
321
315
if entry ["source" ] == "shell" :
322
- if awaiting_input :
323
- input_queue .put (entry ["input" ])
316
+ if self . awaiting_input :
317
+ self . input_queue .put (entry ["input" ])
324
318
else :
325
- run ( )
319
+ self . task_queue . put ( entry )
326
320
else :
327
- if not TESTING and awaiting_input :
328
- process .terminate ()
329
- start_process ()
321
+ if not TESTING and self . awaiting_input :
322
+ self . process .terminate ()
323
+ self . start_process ()
330
324
331
- run ()
325
+ self .task_queue .put (entry )
326
+
327
+ def await_result (self , callback ):
328
+ # TODO cancel if result was cancelled by a newer handle_entry
332
329
result = None
333
330
while result is None :
334
331
try :
335
- result = result_queue .get (timeout = 3 )
332
+ result = self . result_queue .get (timeout = 3 )
336
333
except queue .Empty :
337
- alive = process .is_alive ()
334
+ alive = self . process .is_alive ()
338
335
print (f"Process { alive = } " )
339
336
if alive :
340
- process .terminate ()
341
- start_process ()
337
+ self . process .terminate ()
338
+ self . start_process ()
342
339
result = dict (
343
340
output_parts = [
344
341
dict (color = 'red' , text = 'The process died.\n ' ),
@@ -351,13 +348,62 @@ def run():
351
348
awaiting_input = False ,
352
349
birdseye_objects = None ,
353
350
)
354
- awaiting_input = result ["awaiting_input" ]
355
- connection .send (result )
351
+ self .awaiting_input = result ["awaiting_input" ]
352
+ callback (result )
353
+
354
+
355
+ def master_consumer_loop (comms : AbstractCommunications ):
356
+ comms = comms .make_master_side_communications ()
357
+ manager = multiprocessing .Manager ()
358
+ user_processes = defaultdict (lambda : UserProcess (manager ))
359
+
360
+ while True :
361
+ entry = comms .recv_entry ()
362
+ user_id = str (entry ["user_id" ])
363
+ user_process = user_processes [user_id ]
364
+ user_process .handle_entry (entry )
365
+
366
+ def callback (result ):
367
+ comms .send_result (user_id , result )
368
+
369
+ Thread (
370
+ target = user_process .await_result ,
371
+ args = [callback ],
372
+ ).start ()
373
+
374
+
375
+ @lru_cache ()
376
+ def master_communications () -> AbstractCommunications :
377
+ from django .conf import settings
378
+ if settings .RABBITMQ_HOST :
379
+ from .workers .pika import PikaCommunications
380
+ comms = PikaCommunications ()
381
+ else :
382
+ comms = ThreadCommunications ()
383
+
384
+ if not settings .SEPARATE_WORKER_PROCESS :
385
+ Thread (
386
+ target = master_consumer_loop ,
387
+ args = [comms ],
388
+ daemon = True ,
389
+ name = master_consumer_loop .__name__ ,
390
+ ).start ()
391
+
392
+ return comms
393
+
394
+
395
+ def worker_result (entry ):
396
+ comms : AbstractCommunications = master_communications ()
397
+ comms .send_entry (entry )
398
+ user_id = str (entry ["user_id" ])
399
+ return comms .recv_result (user_id )
400
+
401
+
402
+ def main ():
403
+ from main .workers .pika import PikaCommunications
404
+ comms = PikaCommunications ()
405
+ master_consumer_loop (comms )
356
406
357
407
358
- @lru_cache (maxsize = None )
359
- def worker_connection (_user_id ):
360
- parent_connection , child_connection = Pipe ()
361
- p = Thread (target = consumer , args = (child_connection ,), daemon = True )
362
- p .start ()
363
- return parent_connection
408
+ if __name__ == '__main__' :
409
+ main ()
0 commit comments