-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbrain.py
257 lines (206 loc) · 9.76 KB
/
brain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""
brain.py: A simple library for reading data generated by NeuroSky EEG devices.
Specifically, this was designed for reading data from toys using the low-end
version of the NeuroSky chipset, which does not appear to support all the
features of the full version.
The library does not itself specifically require a serial library (e.g.
PySerial). The serial connection object is passed to the `Brain` constructor,
and it is up to the user of the library to actually create it. The standard
NeuroSky chipset, at least in the first generation of toy headsets,
communicates at 9600 baud, no parity, 8 bits, 1 stop bit.
Requires Python 2.7 (although it should work in 2.5).
"""
from collections import namedtuple
import threading
import time
class Brain(threading.Thread):
""" Object for handling the output of a Neurosky-based EEG toy. Subclass
and replace the `handlePacket()` and/or `checksumFail()` methods, or
instantiate as-is and provide handler functions as arguments.
@cvar bandNames: The names (and order) of the individual bands in a
set of EEG values.
@cvar EEGValues: a 'named tuple' class, used to return the EEG values.
@var bytesRead: The total number of bytes read (as parts of a valid
packet or not).
@var packetsRead: The total number of packets parsed.
"""
bandNames = ("delta",
"theta",
"alpha_low", "alpha_high",
"beta_low", "beta_high",
"gamma_low", "gamma_mid"
)
EEGValues = namedtuple("EEGValues", bandNames)
def __init__(self, serialPort,
packetHandler=None,
checksumFailHandler=None):
""" Create a new Brain object.
@param serialPort: The serial connection object (or other
stream-like object that reads one byte at a time, and a
`closed` attribute that remains `True` while running).
@keyword packetHandler: A function to call upon successfully
receiving and parsing a packet from the NeuroSky EEG chip,
overriding the class' `handlePacket()` method.
@keyword checksumFailHandler: A function to call if a
packet fails a checksum (i.e. the data is corrupt), overriding
the class' `checksumFail()` method.
"""
threading.Thread.__init__(self)
self.serialPort = serialPort
self._handlePacket = packetHandler or self.handlePacket
self._checksumFail = checksumFailHandler or self.checksumFail
self.packetsRead = 0
self.bytesRead = 0
self._running = True
def start(self, *args, **kwargs):
self._running = True
super(Brain, self).start(*args, **kwargs)
def stop(self):
self._running = False
def checksum(self, packet):
""" Perform a checksum on the packet.
@param packet: The packet as an array of bytes
@rtype: bool
"""
if len(packet) < 2:
return False
return (~sum(packet[1:-1])) & 255 == packet[-1]
def run(self):
""" The main loop that runs continuously, receiving bytes and
attempting to parse them into packets.
"""
packet = bytearray()
inPacket = False
packetLength = 0
lastByte = 0
startTime = None
while self._running and not self.serialPort.closed:
latestByte = ord(self.serialPort.read())
self.bytesRead += 1
if inPacket:
if len(packet) == 0:
# Start of new packet, first byte is length
packetLength = latestByte
elif len(packet) > packetLength+1:
# End of packet (last byte is checksum, not in reported size)
if self.checksum(packet):
self.parsePacket(packet, startTime)
else:
self._checksumFail(packet, startTime)
inPacket = False
packet.append(latestByte)
elif latestByte == lastByte == 170:
startTime = time.time()
inPacket = True
packet = bytearray()
lastByte = latestByte
def parsePacket(self, packet, timestamp=None):
""" Convert a raw packet into values for the packet handler. The
packet is assumed to have passed a checksum, but could still fail
to be parsed; if it cannot be parsed, the parsing failure handler
is called.
@param packet: An array of bytes as received from the NeuroSky
chip, minus the initial sync bytes (0xAA+0xAA).
"""
result = {"timestamp": timestamp, "damaged": False}
# Copy the packet so it won't get modified in place, skipping the
# first (size) and last (checksum) items; reverse the copy so its
# items can be popped off in order.
thisPacket = bytearray(reversed(packet[1:-1]))
# The MindFlex headset (single-player model) appears to always
# produce identically-formatted packets, so this code may do more
# work to parse the packet than is strictly necessary. This may not
# be true for other toy headsets or different MindFlex models, so
# this code is likely to work with any device.
while len(thisPacket) > 0:
try:
b = thisPacket.pop()
# Single-byte data.
if b == 0x02:
result['signal_quality'] = thisPacket.pop()
elif b == 0x04:
result['attention'] = thisPacket.pop()
elif b == 0x05:
result['meditation'] = thisPacket.pop()
elif b == 0x16:
result['blink'] = thisPacket.pop()
# Multiple byte data. The first byte is the length in bytes.
elif b == 0x80:
# Raw wave value; not generated by the toy version, not
# full parsed here.
result['raw_wave'] = [thisPacket.pop() for i in xrange(thisPacket.pop())]
elif b == 0x83:
# ASIC_EEG_POWER: big-endian 24b unsigned integers.
length = thisPacket.pop()
power = []
for i in range(length/3):
power.append(thisPacket.pop() << 16 | \
thisPacket.pop() << 8 | \
thisPacket.pop())
result['asic_eeg_power'] = self.EEGValues(*power)
# Bad data. More likely to appear than unknown types.
elif b == 0xAA:
# The packet sync byte. This should never show up as a
# code byte, and almost certainly means the data has been
# corrupted.
result['damaged'] = True
break
# Unknown (but apparently valid) codes
elif b == 0x55:
# 'Extended Code' byte; one or more of these shift the
# code byte into a different range. The toy version of the
# headset shouldn't generate these. I don't think the
# current version of the higher-end chipset does, either.
continue
elif b < 0x80:
# Unknown single-byte datum
result.setdefault('unknown', []).append(b, thisPacket.pop())
else:
# Unknown multi-byte data
value = [thisPacket.pop() for i in xrange(thisPacket.pop())]
result.setdefault('unknown', []).append(hex(b), thisValue)
except IndexError:
# A badly-formed packet will empty prematurely.
result['damaged'] = True
break
self.packetsRead += 1
self._handlePacket(**result)
def handlePacket(self, **packetData):
""" Default packet handler. It is expected for this to be overridden
when Brain is initialized, or replaced in a Brain subclass.
@keyword asic_eeg_power: A 'named tuple' with the power of each
band, in the order specified in `Brain.bandNames`.
@keyword attention: The computed 'attention' value, 0-100.
@keyword blink: Blink strength, only sent when a blink occurs.
Not generated by the toy version of the chipset.
@keyword damaged: `True` if the packet failed to parse correctly.
@keyword meditation: The computed 'meditation' value, 0-100.
@keyword raw_wave: The raw data read by the headset. Not generated
by the toy version of the headset.
@keyword signal_quality: The 'quality' of the signal, 0-200. 0 is
perfect, 200 is bad.
@keyword timestamp: The time at which the packet started arriving.
"""
print "Received:", packetData
def checksumFail(self, packet):
""" Default method called when a checksum fails. This can be
overridden when Brain is initialized or replaced in a Brain
subclass.
"""
print "CHECKSUM FAIL: %r" % packet
if __name__ == "__main__":
import sys
import serial
import serial.tools.list_ports
if len(sys.argv) > 1:
port = sys.argv[-1]
else:
print "To test Brain, provide a serial port name, e.g. one of:"
for p in serial.tools.list_ports.comports():
print "\t%s" % p[0]
exit(0)
serialport = serial.Serial(port=port, baudrate=9600)
b = Brain(serialport)
b.start()
while True:
pass