-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserialproxy.py
365 lines (301 loc) · 12 KB
/
serialproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# This behaves similar to:
# socat -dd pty,rawer,link=server.dev pty,rawer,link=client.dev
# But it also simulates baudrate slowness and checks for baudrate
# compatibility.
import asyncio
import fcntl
import os
import select
import sys
import termios
import time
from collections import namedtuple
from contextlib import suppress
from warnings import warn
tcattr = namedtuple('tcattr', 'iflag oflag cflag lflag ispeed ospeed cc')
TCSPEED_TO_BAUDRATE = dict(
# termios.B300: 300,
# termios.B9600: 9600,
(getattr(termios, i), int(i[1:]))
for i in dir(termios) if i[0] == 'B' and i[1:].isdigit())
class HangupError(Exception):
pass
class SerialPty:
def __init__(self):
# > How can I detect when someone opens the slave side of a pty
# > (pseudo-terminal) in Linux?
# > https://stackoverflow.com/questions/3486491/
# > [...]
# > However, there is a trick that allows you to do it. After
# > opening the pseudo-terminal master (assumed here to be file
# > descriptor ptm), you open and immediately close the slave side.
# > [...]
# > You now poll the HUP flag regularly with poll()
# This is done in _detect_connect_and_hup().
# TODO: can/should we do this manually using open('/dev/ptmx') instead?
self.fd, self.worker_fd = os.openpty()
self._ptsname = os.ttyname(self.worker_fd) # do it before close!
os.close(self.worker_fd)
self._baudrate = None
self._writebuf = []
self._writelast = 0
self._set_nonblock()
@property
def baudrate(self):
"Baudrate detected on this pty"
if self._baudrate is None:
self._detect_baudrate()
assert self._baudrate is not None
return self._baudrate
@property
def bits_per_byte(self):
"Assume 1start, 7data, 1parity, 1stop"
return (1 + 7 + 1 + 1)
@property
def time_per_byte(self):
"How much time/delay we emulate for a single byte for this baudrate"
return (1.0 / self.baudrate * self.bits_per_byte)
def _set_nonblock(self):
"Set to non-blocking; standard for asyncio"
flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def read_byte(self):
"Read byte from fd directly"
try:
byte = os.read(self.fd, 1)
except OSError as e:
if e.args[0] == 5: # EIO
raise HangupError()
elif e.args[0] == 11: # EAGAIN/EWOULDBLOCK
pass
else:
raise
assert len(byte) == 1, byte
self._detect_baudrate() # update detected baud on read
return byte
def write_byte(self, byte, baudrate):
"Schedule byte to be written as fast as baudrate permits"
assert len(byte) == 1, byte
self._writebuf.append((byte, baudrate))
# Schedule a single byte to be written _only_ if this is the
# first in the queue. In other cases the latest write will
# schedule a new one.
if len(self._writebuf) == 1:
self._schedule_write_after_baudrate_delay()
def close(self):
"Clean up used fds"
# When we close self.fd, self.worker_fd gets closed automatically.
with suppress(OSError):
# This should succeed, but if it doesn't we might as well
# resume our cleanup.
os.close(self.fd)
def _detect_baudrate(self):
"Called on read and on write"
# TODO: can we / do we want to detect more? CS7? etc..?
tc = tcattr(*termios.tcgetattr(self.fd))
assert tc.ispeed == tc.ospeed, tc
self._baudrate = TCSPEED_TO_BAUDRATE[tc.ispeed]
def _schedule_write_after_baudrate_delay(self):
"Schedule a write after baudrate delay"
loop = asyncio.get_running_loop()
tnext = self._writelast + self.time_per_byte
tdelay = tnext - time.time()
if tdelay <= 0:
loop.call_soon(self._background_write)
else:
# #loop.call_soon(self._writer, fd)
# #loop.call_at(tnext, self._writer, fd)
loop.call_later(tdelay, self._background_write)
def _background_write(self):
"The actual write after the baudrate delay"
assert self._writebuf, self
self._detect_baudrate() # update detected baud on write
byte, peer_baudrate = self._writebuf.pop(0)
# Compare baudrate with expected baudrate.
if peer_baudrate != self.baudrate:
# This is NOT always a problem. It might be if there are
# many of these though.. (There's a slight going on when
# (re)setting the baud rate.)
print('(baudrate mismatch, forwarding {!r} from {} to {})'.format(
byte, peer_baudrate, self.baudrate))
# An EWOULDBLOCK/EAGAIN or any other error is unexpected here.
count = os.write(self.fd, byte)
assert count == 1, count
self._writelast = time.time()
# Schedule next write in a timely manner.
if self._writebuf:
self._schedule_write_after_baudrate_delay()
def __repr__(self):
buflen = len(self._writebuf)
return f'<Pty(fd={self.fd}, baud={self.baudrate}, writebuf={buflen}>'
class SerialProxy:
def __init__(self):
# We create two PTYs. Then we can attach a server serial.Serial
# to one end and a client serial.Serial to the other.
self._pty1 = SerialPty()
self._pty2 = SerialPty()
# ^-- XXX: interestingly.. the order matters; see .adev and .bdev below
# as if the first openpty() and the second are aware of each other
@property
def adev(self):
"Device name of the A (server) side"
return self._pty1._ptsname
@property
def bdev(self):
"Device name of the B (client) side"
return self._pty2._ptsname
async def run(self):
"Start the serial proxy and run until one end disconnects"
# Don't add writer tasks, as we have nothing to write; we'd just
# busy-loop. Don't add reader tasks either. That's done by
# _detect_connect_and_hup once both sockets are connected.
await self._detect_connect_and_hup()
async def _detect_connect_and_hup(self):
"""
Poll the closed worker_fd for POLLHUP; they will un-HUP once
they're connected. Then we add the readers. And shutdown once
one side HUPs again.
TODO: Maybe allow reconnecting..
"""
def hup_count(evs):
hups = 0
for fd, ev in evs:
assert not (ev & (select.POLLERR | select.POLLNVAL)), evs
if ev & select.POLLHUP:
hups += 1
return hups
# Add the two (disconnected) slave FDs to the poller.
poller = select.poll()
for pty in (self._pty1, self._pty2):
poller.register(pty.worker_fd, (
select.POLLIN | select.POLLHUP | select.POLLERR |
select.POLLNVAL))
# Is any fd still in hangup (HUP) state?
while hup_count(poller.poll(0)) != 0:
await asyncio.sleep(0.1)
print('Both sides connected, starting readers')
loop = asyncio.get_running_loop()
loop.add_reader(
self._pty1.fd, self._reader, self._pty1, self._pty2)
loop.add_reader(
self._pty2.fd, self._reader, self._pty2, self._pty1)
try:
# Are all fds still connected (not in hangup state)?
while hup_count(poller.poll(0)) == 0:
await asyncio.sleep(0.1)
except asyncio.exceptions.CancelledError:
# Task is stopped because we're shutting down. (loop.stop(),
# possibly due to a fatal signal.)
print('Poll task is cancelled, stopping all')
else:
print('One side closed the connection, stopping all')
finally:
# Stop reader tasks.
loop.remove_reader(self._pty1.fd)
loop.remove_reader(self._pty2.fd)
def _reader(self, pty, peer_pty):
"Reader gets called once there is a byte available"
# We don't attempt to do multiple bytes at once. We assume that
# the baudrate/speed is sufficiently low that we can task switch
# easily. Remember: this is made for low baudrate tests.
# XXX: test this? does this statement make sense?
try:
byte = pty.read_byte()
except BaseException as e:
loop = asyncio.get_running_loop()
loop.remove_reader(pty.fd)
loop.remove_reader(peer_pty.fd)
if not isinstance(e, HangupError):
raise
else:
peer_pty.write_byte(byte, pty.baudrate)
def close(self):
"Clean up the file descriptors"
self._pty1.close()
self._pty2.close()
class ExposedSerialProxy(SerialProxy):
"""
ExposedSerialProxy exposes the B-device as a symlink
Other than that, it's just the plain SerialProxy. Don't forget to
call close(), even if you haven't connected yet.
"""
def __init__(self, exposed_as):
super().__init__()
os.symlink(self.bdev, exposed_as)
self._exposed_as = exposed_as
def _hide(self):
"Hide symlink as soon as someone connects"
if not os.path.islink(self._exposed_as):
warn('{!r} is not a symlink?'.format(self._exposed_as))
else:
try:
os.unlink(self._exposed_as)
except OSError as e:
warn('error {} during unlink of {!r}'.format(
e.args[0], self._exposed_as))
self._exposed_as = None
def _reader(self, *args, **kwargs):
if self._exposed_as is not None:
self._hide()
return super()._reader(*args, **kwargs)
def close(self):
if self._exposed_as is not None:
self._hide()
super().close()
def spawn_serialproxy_child(devname):
"""
Spawn a SerialProxy child process
The SerialProxy will proxy data between the two serial endpoints.
Connect two applications which you would normally connect to a
serial interface like /dev/ttyAMA0.
Example in server process:
class ChildExited(Exception):
pass
def sigchld(frame, signum):
pid, status = os.wait()
raise ChildExited(pid, status)
signal.signal(signal.SIGCHLD, sigchld)
proxy_child, proxy_devname = spawn_serialproxy_child(
'/path/to/serial.dev')
ser = serial.Serial(
port=proxy_devname, baudrate=9600,
exclusive=True)
ser.read_byte()
Example in the client process:
ser = serial.Serial(
port='/path/to/serial.dev', baudrate=9600,
exclusive=True)
ser.write_byte(b'X')
This allows you to test the client against a local (test) server
instead of to a hardware device.
Caveats: baudrate is not actually enforced on both sized, but an
attempt is made to check equality on both sides of the proxy.
Bytesize, parity and stopbits are probably not settable because of
openpty(3) limitations.
"""
rfd, wfd = os.pipe2(0)
# Take care not to do any asyncio calls before the fork. Otherwise
# their results may be attached to the parent loop, which is now
# unreachable.
child_pid = os.fork()
if child_pid != 0:
os.close(wfd)
adev = os.read(rfd, 255).decode('ascii')
os.close(rfd)
return child_pid, adev
# We don't need stdin in the child (and sys.stdin.close() does not
# actually close any fds).
os.close(sys.stdin.fileno())
os.close(rfd)
proxy = ExposedSerialProxy(devname)
child_pid = os.getpid()
print(f'Running proxy {child_pid} on {devname} ({proxy.bdev})')
# Report address back to parent.
os.write(wfd, proxy.adev.encode('ascii'))
os.close(wfd)
try:
asyncio.run(proxy.run())
except KeyboardInterrupt:
proxy.close()
print('Stopped proxy')
os._exit(0)