-
Notifications
You must be signed in to change notification settings - Fork 1
/
common.py
152 lines (119 loc) · 4.58 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Common test functions
"""
from typing import Dict, Any, Self, List
from types import TracebackType
import json
from mtbdaemonif import mtb_daemon, MtbDaemonIFace
TEST_MODULE_ADDR = 1
INACTIVE_MODULE_ADDR = 2
CONFIG_FN = 'mtb-daemon-test.json'
with open(CONFIG_FN, 'r') as file:
CONFIG_JSON = json.loads(file.read())
MODULES_JSON = {int(addrstr): module for addrstr, module in CONFIG_JSON['modules'].items()}
assert TEST_MODULE_ADDR in MODULES_JSON.keys()
assert INACTIVE_MODULE_ADDR in MODULES_JSON.keys()
class MtbDaemonError:
INVALID_JSON = 1000
MODULE_INVALID_ADDR = 1100
MODULE_INVALID_PORT = 1101
MODULE_FAILED = 1102
INVALID_SPEED = 1105
INVALID_DV = 1106
MODULE_ACTIVE = 1107
FILE_CANNOT_ACCESS = 1010
MODULE_ALREADY_WRITING = 1110
UNKNOWN_COMMAND = 1020
DEVICE_DISCONNECTED = 2004
ALREADY_STARTED = 2012
MODULE_UPGRADING_FW = 3110
MODULE_IN_BOOTLOADER = 3111
MODULE_CONFIG_SETTING = 3112
MODULE_REBOOTING = 3113
MODULE_FWUPGD_ERROR = 3114
MODULE_UNKNOWN_COMMAND = 0x1001
MODULE_UNSUPPORTED_COMMAND = 0x1002
MODULE_BAD_ADDRESS = 0x1003
SERIAL_PORT_CLOSED = 0x1010
USB_NO_RESPONSE = 0x1011
BUS_NO_RESPONSE = 0x1012
def check_version_format(version: str) -> None:
versions = version.split('.')
assert len(versions) == 2
major, minor = versions
assert major.isdigit()
assert minor.isdigit()
def check_error(response: Dict[str, Any], error_id: int) -> None:
assert 'status' in response
assert isinstance(response['status'], str)
assert response['status'] == 'error'
assert 'error' in response
assert isinstance(response['error'], dict)
error = response['error']
assert 'code' in error
assert isinstance(error['code'], int)
assert response['error']['code'] == error_id
assert 'message' in error
assert isinstance(error['message'], str)
assert error['message'] != ''
def check_invalid_addresses(request: Dict[str, Any], addr_key: str) -> None:
"""
Sends request `request` with several invalid addresses and checks that the
server replies with proper error.
"""
INVALID_ADDRS = [0, -1, 0x1FF, 49840938, 'hello', '0x24']
for addr in INVALID_ADDRS:
request[addr_key] = addr
response = mtb_daemon.request_response(request, ok=False)
check_error(response, MtbDaemonError.MODULE_INVALID_ADDR)
def set_single_output(addr: int, output: int, value: int) -> None:
mtb_daemon.request_response({
'command': 'module_set_outputs',
'address': addr,
'outputs': {str(output): {'type': 'plain', 'value': value}}
})
def validate_oc_event(event: Dict[str, Any], addr: int, port: int, value: int) -> None:
assert 'module_outputs_changed' in event
moc_event = event['module_outputs_changed']
assert 'address' in moc_event
assert moc_event['address'] == addr
assert 'outputs' in moc_event
assert str(port) in moc_event['outputs']
json_port = moc_event['outputs'][str(port)]
assert json_port['type'] == 'plain'
assert json_port['value'] == value
def validate_ic_event(event: Dict[str, Any], addr: int, port: int, value: bool) -> None:
assert 'module_inputs_changed' in event
moc_event = event['module_inputs_changed']
assert 'address' in moc_event
assert moc_event['address'] == addr
assert 'inputs' in moc_event
assert 'full' in moc_event['inputs']
assert 'packed' in moc_event['inputs']
assert moc_event['inputs']['full'][port] == value
class ModuleSubscription:
def __init__(self, daemon: MtbDaemonIFace, addrs: List[int]):
self.daemon: MtbDaemonIFace = daemon
self.addrs: List[int] = addrs
def __enter__(self) -> Self:
response = self.daemon.request_response({
'command': 'module_subscribe',
'addresses': self.addrs,
})
assert response['addresses'] == self.addrs
return self
def __exit__(self, exception_type: Any, exception_value: Any,
exception_traceback: TracebackType) -> None:
self.daemon.request_response({
'command': 'module_unsubscribe',
'addresses': self.addrs,
})
class TopoSubscription:
def __init__(self, daemon: MtbDaemonIFace):
self.daemon: MtbDaemonIFace = daemon
def __enter__(self) -> Self:
self.daemon.request_response({'command': 'topology_subscribe'})
return self
def __exit__(self, exception_type: Any, exception_value: Any,
exception_traceback: TracebackType) -> None:
self.daemon.request_response({'command': 'topology_unsubscribe'})