forked from cttoronto/python-mindwave-mobile
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMindwaveMobileRawReader.py
70 lines (56 loc) · 2.5 KB
/
MindwaveMobileRawReader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import bluetooth
import time
class MindwaveMobileRawReader:
START_OF_PACKET_BYTE = 0xaa;
def __init__(self):
self._buffer = [];
self._bufferPosition = 0;
def connectToMindWaveMobile(self):
# connecting via bluetooth RFCOMM
self.mindwaveMobileSocket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
mindwaveMobileAddress = '9C:B7:0D:72:CD:02';
while(True):
try:
self.mindwaveMobileSocket.connect((mindwaveMobileAddress, 1))
return;
except bluetooth.btcommon.BluetoothError as error:
print "Could not connect: ", error, "; Retrying in 5s..."
time.sleep(5)
def _readMoreBytesIntoBuffer(self, amountOfBytes):
newBytes = self._readBytesFromMindwaveMobile(amountOfBytes)
self._buffer += newBytes
def _readBytesFromMindwaveMobile(self, amountOfBytes):
missingBytes = amountOfBytes
receivedBytes = ""
# Sometimes the socket will not send all the requested bytes
# on the first request, therefore a loop is necessary...
while(missingBytes > 0):
receivedBytes += self.mindwaveMobileSocket.recv(missingBytes)
missingBytes = amountOfBytes - len(receivedBytes)
return receivedBytes;
def peekByte(self):
self._ensureMoreBytesCanBeRead();
return ord(self._buffer[self._bufferPosition])
def getByte(self):
self._ensureMoreBytesCanBeRead(100);
return self._getNextByte();
def _ensureMoreBytesCanBeRead(self, amountOfBytes):
if (self._bufferSize() <= self._bufferPosition + amountOfBytes):
self._readMoreBytesIntoBuffer(amountOfBytes)
def _getNextByte(self):
nextByte = ord(self._buffer[self._bufferPosition]);
self._bufferPosition += 1;
return nextByte;
def getBytes(self, amountOfBytes):
self._ensureMoreBytesCanBeRead(amountOfBytes);
return self._getNextBytes(amountOfBytes);
def _getNextBytes(self, amountOfBytes):
nextBytes = map(ord, self._buffer[self._bufferPosition: self._bufferPosition + amountOfBytes])
self._bufferPosition += amountOfBytes
return nextBytes
def clearAlreadyReadBuffer(self):
self._buffer = self._buffer[self._bufferPosition : ]
self._bufferPosition = 0;
def _bufferSize(self):
return len(self._buffer);
#------------------------------------------------------------------------------