-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadTesting.py
39 lines (38 loc) · 1.02 KB
/
threadTesting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from threading import Thread
import time
import sys
def imagingPipeline():
global img, running
print("Starting imaging pipeline")
while running:
if img != None:
print("Imaging Pipeline: Got a new image")
time.sleep(2)
print("Imaging Pipeline: finished processing image")
img = None
print("Imaging Pipeline: Received shutdown")
sys.exit(0)
def navPipeline():
global running, img
print("Starting nav pipeline")
while running:
time.sleep(3)
print("Nav Pipeline: Took a new photo with camera")
img = "hello"
print("Nav Pipeline: Received shutdown")
img = None
running = True
imaging = Thread(target=imagingPipeline)
nav = Thread(target =navPipeline)
try:
imaging.start()
nav.start()
while(True):
pass
except (KeyboardInterrupt, SystemExit):
running = False
print("Main: Interrupt received. Closing threads")
imaging.join()
nav.join()
print("Main: All threads terminated")
sys.exit(0)