-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclapi.py
155 lines (118 loc) · 4.11 KB
/
clapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import json
import os
import time
import serial as s
import struct
import math
import sys
from asynclapi import *
BAUD_RATE = 115200
KEKMECH_VERSION = '1.2.2'
CMD_HANDSHAKE = 0x13
debug = False
core = None
def start():
""" Set up connection with all connected devices """
global core
core = Core()
def status():
print('Core version {}'.format(KEKMECH_VERSION))
for dev in core.devices:
print(dev)
class Core:
"""
Сore object stores a list of devices and adds
links to devices to Clapi module attributes
"""
def __init__(self):
"""
Trying to connect with all connected devices and
get a list of devices connected via USB.
Chineese arduino: /dev/ttyUSB###
Arduino and STM32: /dev/ttyACM###
"""
devices = ['/dev/ttyACM' + str(i) for i in range(0, 4)] + ['/dev/ttyUSB' + str(i) for i in range(0, 4)]
activeDevices = map(lambda e: Device(e) if os.path.exists(e) else None, devices)
clapiModule = sys.modules[__name__]
self.devices = list()
for d in activeDevices:
if hasattr(d,'id'):
setattr(clapiModule, str(d.id), d)
self.devices.append(d)
class Device:
"""
The Device instance is trying to communicate over the protocol
with the deviceId (for example "/dev/ttyUSB0") that it was given
"""
def __init__(self, deviceId):
self.serial = SerialWrapper(s.Serial(deviceId, BAUD_RATE), deviceId)
self.task_pool = TaskPool(self.serial)
# handshake is an attempt to establish a connection with the device
self.data = self.serial.handshake()
if (self.data is not None):
self.id = self.data.get('device_id', 'unnamed')
def push(self, code:int, *args):
self.serial.push(code, args)
def pull(self):
return json.loads(self.serial.pull())
def request(self, code:int, *args):
self.serial.push(code, args)
return json.loads(self.serial.pull())
def push_async(self, code:int, *args):
task = Push(code, *args)
task._executor = self.task_pool.push_task
return task
def request_async(self, code:int, *args):
task = Request(code, *args)
task._executor = self.task_pool.push_task
return task
def long_poll_async(self, code:int, *args):
task = LongPoll(code, *args)
task._executor = self.task_pool.push_task
return task
def reset(self):
self.task_pool.reset()
def __str__(self):
response = '(Device \"{}\": '.format(self.id)
response += str(self.task_pool)
response += ')'
return response
class SerialWrapper:
""" Convenient wrapper for Serial. Also used for testing """
def __init__(self, serial, deviceId):
""" Don't create serial instance here, get in outside """
self.serial = serial
self.deviceId = deviceId
def decompose(self, number):
""" Convert float to byte array """
return bytes(struct.pack('f', number))
def push(self, code:int, args):
""" Send message to device """
if debug:
print('push', self.deviceId, '>>', {'code': code, 'args': args})
argsCount = len(args)
bytesToSend = bytes([code, argsCount])
for arg in args:
bytesToSend += self.decompose(arg)
self.serial.write(bytesToSend)
def request(self, code:int, args):
self.push(code, args)
return self.pull()
def flush(self):
self.serial.flush()
def clear_input(self):
self.serial.reset_input_buffer()
def pull(self):
""" Get a message from the device """
response = self.serial.readline().decode('ascii').rstrip()
if debug:
print('pull', self.deviceId, '<<', response)
return response
def inWaiting(self):
return self.serial.inWaiting()
def handshake(self):
""" Set up connection """
if not self.serial.inWaiting():
self.push(CMD_HANDSHAKE,[])
line = self.pull()
return json.loads(line)