-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasynclapi.py
166 lines (128 loc) · 5.5 KB
/
asynclapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
Addition to the Clapi library adding asynchronous requests to devices.
The appearance of the API is as close as possible to the HTTP request API.
Direct call of functions and classes from this file is provided only for debug cases,
for all other cases it is better to use only clapy.py functions.
"""
from threading import Thread, Lock
import json
import time
class TaskPool():
def __init__(self, serial_wrapper):
self.serial_wrapper = serial_wrapper
self.running = True
self.main_thread = None # thread that processes incoming messages and tasks
self.tasks = list() # tasks queue
self.subscribers = dict() # (CODE -> LISTENER) where CODE is code of the Task for which the response is expected
self.task_lock = Lock()
self.inbox = dict() # for messages which nobody waited
def push_task(self, task):
""" Add task to perform and run main thread """
self.task_lock.acquire()
self.tasks.append(task)
# run thread if it is not alive
if not self.main_thread or not self.main_thread.isAlive():
self.main_thread = Thread(target=self.main_loop, daemon=False) # daemon=True for force terminating
self.main_thread.start()
self.task_lock.release()
def push_subscriber(self, s):
""" Add subscriber for incoming messages """
self.subscribers[s._code] = s
def main_loop(self):
"""
Main loop for task performing.
First, check incoming messages, then perform ONE task
"""
while self.running:
self.task_lock.acquire()
# accept incoming messages
self.process_input()
if len(self.tasks):
self.process_output()
else:
time.sleep(0) # like thread.yield() in other langs
self.task_lock.release()
def process_input(self):
# TRY TO PRESERVE MESSAGES
for code, message in self.inbox.items():
target = self.subscribers.get(code, None)
if target:
if target._callback:
target._callback(message)
self.subscribers.pop(code, None)
self.inbox.pop(code, None)
if isinstance(target, LongPoll):
self.push_task(target)
while self.serial_wrapper.inWaiting():
response = json.loads(self.serial_wrapper.pull())
code = response.get('code', -1)
if code == -1:
print('Response to the void (response without CODE):', response) # You cannot reply to asynchronous messages without the "code" field.
else:
target = self.subscribers.get(code, None)
if target:
if target._callback:
target._callback(response)
self.subscribers.pop(code, None)
if isinstance(target, LongPoll): # add performed task again to the task queue if the task is long-poll
self.push_task(target)
else: # if the message has no recipient
self.inbox[code] = response
def process_output(self):
""" Perform ONE task """
cur_task = self.tasks.pop(0) # take task from queue start
if isinstance(cur_task, Push):
self.serial_wrapper.push(cur_task._code, list(cur_task._args))
if isinstance(cur_task, Request) or isinstance(cur_task, LongPoll):
self.push_subscriber(cur_task) # add subscriber if task is request or long-poll
self.serial_wrapper.push(cur_task._code, list(cur_task._args))
def reset(self):
self.subscribers = dict()
self.tasks = list()
if (self.task_lock.locked()): self.task_lock.release()
def __str__(self):
response = '\nmain_thread: '
response += 'active' if self.main_thread and self.main_thread.isAlive() else 'stopped'
if len(self.tasks):
for t in self.tasks:
response += "\n[task] {}".format(t)
else:
response += "\nno tasks"
if len(self.subscribers):
for s in self.subscribers.values():
response += "\n[subscriber] {}".format(s)
else:
response += "\nno subscribers"
return response
class Task():
def __init__(self, control_code, *control_args):
self._code = control_code
self._args = control_args
self._executor = None
def code(self, control_code:int):
self._code = control_code
return self
def args(self, *control_args):
self._args = control_args
return self
def execute(self):
if self._executor:
self._executor(self)
class Push(Task):
def __str__(self):
return "Push(code={}, args={})".format(self._code, str(self._args))
class CallbackTask(Task):
def __init__(self, control_code, *control_args):
self._code = control_code
self._args = control_args
self._executor = None
self._callback = None
def callback(self, control_callback):
self._callback = control_callback
return self
class Request(CallbackTask):
def __str__(self):
return "Request(code={}, args={}, callback={})".format(self._code, str(self._args), str(self._callback))
class LongPoll(CallbackTask):
def __str__(self):
return "LongPoll(code={}, args={}, callback={})".format(self._code, str(self._args), str(self._callback))