-
Notifications
You must be signed in to change notification settings - Fork 2
/
UsbWatcher.py
executable file
·121 lines (96 loc) · 3.68 KB
/
UsbWatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
This is the main script of the programmer, ran from a service that is configured to restart when killed.
"""
import sys, os
from pathlib import Path
import subprocess
import time, signal
import traceback
from config import MOUNT_PATH, NAME_OF_USB_DONGLE, STATE_MANAGER_PATH
def checkMedia():
"""
Checks if a drive with a name equal to the configured NAME_OF_USB_DONGLE is mounted at MOUNT_PATH.
Returns: True if available, False else.
"""
for item in os.listdir(MOUNT_PATH):
if os.path.isdir(MOUNT_PATH + "/" + item):
if item == NAME_OF_USB_DONGLE:
filesOnDrive = os.listdir(MOUNT_PATH + NAME_OF_USB_DONGLE)
if len(filesOnDrive) > 0:
return True
return False
class ProggerManager:
"""
This class is responsible:
- monitoring connection/disconnection of usb code drive.
- launching/killing the sub process of the State Manager.
"""
session = None
running = False
def __init__(self):
pass
def disconnectedCodeDrive(self):
""" Handles code drive disconnected event. """
self.cleanup()
def connectedCodeDrive(self):
""" Handles code drive connected event. """
if not self.running:
self.running = True
# start all processes. LED is on. Button is listened to!
self.runCode()
def runCode(self):
""" Launch sub process for state manager script. """
print("UsbWatcher: Starting subprocess")
self.session = subprocess.Popen(["sudo", "python3", STATE_MANAGER_PATH], preexec_fn=os.setsid)
def cleanup(self):
""" Kill all processes. LED goes off. Stop reading button. """
print("UsbWatcher: Kill ProggerStateManager process")
self.running = False
if self.session is not None:
print("UsbWatcher: Terminating ProggerStateManager")
try:
self.session.terminate()
except:
print("UsbWatcher: Error terminating ProggerStateManager", sys.exc_info()[0])
traceback.print_exc()
try:
print("UsbWatcher: Waiting to terminate ProggerStateManager")
self.session.wait(5)
print("UsbWatcher: ProggerStateManager has been Terminated.")
except:
traceback.print_exc()
print("UsbWatcher: Terminate not successful. Killing ProggerStateManager now.", sys.exc_info()[0])
if self.session is not None:
self.session.kill()
time.sleep(3)
self.session = None
else:
print("UsbWatcher: I dont have a session!")
codeUsbDongleConnected = checkMedia()
print("State of USB:", codeUsbDongleConnected)
theManager = ProggerManager()
running = True
def cleanup(source=None, frame=None):
theManager.cleanup()
running = False
time.sleep(1)
quit()
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
if codeUsbDongleConnected:
theManager.connectedCodeDrive()
while running:
measurement = checkMedia()
time.sleep(0.5)
if codeUsbDongleConnected:
codeUsbDongleConnected = measurement
if not codeUsbDongleConnected:
print("UsbWatcher: USB dongle has been disconnected. Closing child processes....")
theManager.disconnectedCodeDrive()
else:
codeUsbDongleConnected = measurement
if codeUsbDongleConnected:
print("UsbWatcher: USB dongle has been Connected! Starting ProggerStateManager!")
theManager.connectedCodeDrive()
else:
subprocess.Popen(["sudo", "mount", "-a"], preexec_fn=os.setsid)