-
Notifications
You must be signed in to change notification settings - Fork 0
/
live.py
294 lines (247 loc) · 10.1 KB
/
live.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import cortex
from cortex import Cortex
import os
from dotenv import load_dotenv
class LiveAdvance():
"""
A class to show mental command data at live mode of trained profile.
You can load a profile trained on EmotivBCI or via train.py example
Attributes
----------
c : Cortex
Cortex communicate with Emotiv Cortex Service
Methods
-------
start():
To start a live mental command process from starting a websocket
load_profile(profile_name):
To load an existed profile or create new profile for training
unload_profile(profile_name):
To unload an existed profile or create new profile for training
get_active_action(profile_name):
To get active actions for the mental command detection.
get_sensitivity(profile_name):
To get the sensitivity of the 4 active mental command actions.
set_sensitivity(profile_name):
To set the sensitivity of the 4 active mental command actions.
"""
def __init__(self, app_client_id, app_client_secret, **kwargs):
self.c = Cortex(app_client_id, app_client_secret, debug_mode=True, **kwargs)
self.c.bind(create_session_done=self.on_create_session_done)
self.c.bind(query_profile_done=self.on_query_profile_done)
self.c.bind(load_unload_profile_done=self.on_load_unload_profile_done)
self.c.bind(save_profile_done=self.on_save_profile_done)
self.c.bind(new_com_data=self.on_new_com_data)
self.c.bind(get_mc_active_action_done=self.on_get_mc_active_action_done)
self.c.bind(mc_action_sensitivity_done=self.on_mc_action_sensitivity_done)
self.c.bind(inform_error=self.on_inform_error)
def start(self, profile_name, headsetId=''):
"""
To start live process as below workflow
(1) check access right -> authorize -> connect headset->create session
(2) query profile -> get current profile -> load/create profile
(3) get MC active action -> get MC sensitivity -> set new MC sensitivity -> save profile
(4) subscribe 'com' data to show live MC data
Parameters
----------
profile_name : string, required
name of profile
headsetId: string , optional
id of wanted headet which you want to work with it.
If the headsetId is empty, the first headset in list will be set as wanted headset
Returns
-------
None
"""
if profile_name == '':
raise ValueError('Empty profile_name. The profile_name cannot be empty.')
self.profile_name = profile_name
self.c.set_wanted_profile(profile_name)
if headsetId != '':
self.c.set_wanted_headset(headsetId)
self.c.open()
def load_profile(self, profile_name):
"""
To load a profile
Parameters
----------
profile_name : str, required
profile name
Returns
-------
None
"""
self.c.setup_profile(profile_name, 'load')
def unload_profile(self, profile_name):
"""
To unload a profile
Parameters
----------
profile_name : str, required
profile name
Returns
-------
None
"""
self.c.setup_profile(profile_name, 'unload')
def save_profile(self, profile_name):
"""
To save a profile
Parameters
----------
profile_name : str, required
profile name
Returns
-------
None
"""
self.c.setup_profile(profile_name, 'save')
def subscribe_data(self, streams):
"""
To subscribe to one or more data streams
'com': Mental command
'fac' : Facial expression
'sys': training event
Parameters
----------
streams : list, required
list of streams. For example, ['sys']
Returns
-------
None
"""
self.c.sub_request(streams)
def get_active_action(self, profile_name):
"""
To get active actions for the mental command detection.
Maximum 4 mental command actions are actived. This doesn't include "neutral"
Parameters
----------
profile_name : str, required
profile name
Returns
-------
None
"""
self.c.get_mental_command_active_action(profile_name)
def get_sensitivity(self, profile_name):
"""
To get the sensitivity of the 4 active mental command actions. This doesn't include "neutral"
It will return arrays of 4 numbers, range 1 - 10
The order of the values must follow the order of the active actions, as returned by mentalCommandActiveAction
If the number of active actions < 4, the rest numbers are ignored.
Parameters
----------
profile_name : str, required
profile name
Returns
-------
None
"""
self.c.get_mental_command_action_sensitivity(profile_name)
def set_sensitivity(self, profile_name, values):
"""
To set the sensitivity of the 4 active mental command actions. This doesn't include "neutral".
The order of the values must follow the order of the active actions, as returned by mentalCommandActiveAction
Parameters
----------
profile_name : str, required
profile name
values: list, required
list of sensitivity values. The range is from 1 (lowest sensitivy) - 10 (higest sensitivity)
For example: [neutral, push, pull, lift, drop] -> sensitivity [7, 8, 3, 6] <=> push : 7 , pull: 8, lift: 3, drop:6
[neutral, push, pull] -> sensitivity [7, 8, 5, 5] <=> push : 7 , pull: 8 , others resvered
Returns
-------
None
"""
self.c.set_mental_command_action_sensitivity(profile_name, values)
# callbacks functions
def on_create_session_done(self, *args, **kwargs):
print('on_create_session_done')
self.c.query_profile()
def on_query_profile_done(self, *args, **kwargs):
print('on_query_profile_done')
self.profile_lists = kwargs.get('data')
if self.profile_name in self.profile_lists:
# the profile is existed
self.c.get_current_profile()
else:
# create profile
self.c.setup_profile(self.profile_name, 'create')
def on_load_unload_profile_done(self, *args, **kwargs):
is_loaded = kwargs.get('isLoaded')
print("on_load_unload_profile_done: " + str(is_loaded))
if is_loaded == True:
# get active action
self.get_active_action(self.profile_name)
else:
print('The profile ' + self.profile_name + ' is unloaded')
self.profile_name = ''
def on_save_profile_done (self, *args, **kwargs):
print('Save profile ' + self.profile_name + " successfully")
# subscribe mental command data
stream = ['com']
self.c.sub_request(stream)
def on_new_com_data(self, *args, **kwargs):
"""
To handle mental command data emitted from Cortex
Returns
-------
data: dictionary
the format such as {'action': 'neutral', 'power': 0.0, 'time': 1590736942.8479}
"""
data = kwargs.get('data')
print('mc data: {}'.format(data))
def on_get_mc_active_action_done(self, *args, **kwargs):
data = kwargs.get('data')
print('on_get_mc_active_action_done: {}'.format(data))
self.get_sensitivity(self.profile_name)
def on_mc_action_sensitivity_done(self, *args, **kwargs):
data = kwargs.get('data')
print('on_mc_action_sensitivity_done: {}'.format(data))
if isinstance(data, list):
# get sensivity
new_values = [7,7,5,5]
self.set_sensitivity(self.profile_name, new_values)
else:
# set sensitivity done -> save profile
self.save_profile(self.profile_name)
def on_inform_error(self, *args, **kwargs):
error_data = kwargs.get('error_data')
error_code = error_data['code']
error_message = error_data['message']
print(error_data)
if error_code == cortex.ERR_PROFILE_ACCESS_DENIED:
# disconnect headset for next use
print('Get error ' + error_message + ". Disconnect headset to fix this issue for next use.")
self.c.disconnect_headset()
# -----------------------------------------------------------
#
# GETTING STARTED
# - Please reference to https://emotiv.gitbook.io/cortex-api/ first.
# - Connect your headset with dongle or bluetooth. You can see the headset via Emotiv Launcher
# - Please make sure the your_app_client_id and your_app_client_secret are set before starting running.
# - The function on_create_session_done, on_query_profile_done, on_load_unload_profile_done will help
# handle create and load an profile automatically . So you should not modify them
# - After the profile is loaded. We test with some advanced BCI api such as: mentalCommandActiveAction, mentalCommandActionSensitivity..
# But you can subscribe 'com' data to get live mental command data after the profile is loaded
# RESULT
# you can run live mode with the trained profile. the data as below:
# {'action': 'push', 'power': 0.85, 'time': 1647525819.0223}
# {'action': 'pull', 'power': 0.55, 'time': 1647525819.1473}
#
# -----------------------------------------------------------
def main():
# Load environment variables from .env file
load_dotenv()
# Please fill your application clientId and clientSecret before running script
your_app_client_id = os.environ['CLIENT_ID']
your_app_client_secret = os.environ['CLIENT_SECRET']
# Init live advance
l = LiveAdvance(your_app_client_id, your_app_client_secret)
trained_profile_name = 'pgame' # Please set a trained profile name here
l.start(trained_profile_name)
if __name__ =='__main__':
main()
# -----------------------------------------------------------