-
Notifications
You must be signed in to change notification settings - Fork 0
/
aux_functions.py
335 lines (216 loc) · 9.14 KB
/
aux_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import matplotlib.pyplot as plt
import scipy.io
import numpy as np
# import cebra
# from cebra import CEBRA
import cebra.models
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from scipy.signal import butter, lfilter
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns
import pandas as pd
from sklearn.decomposition import PCA
import os
from sklearn.preprocessing import StandardScaler, MinMaxScaler
def getdata(user, dataset):
data = scipy.io.loadmat(f"./datasets/S{user}_E1_A{dataset}.mat")
for key in data:
print(key)
emg_data = data['emg']
glove_data = data['glove']
restimulus_data = data['restimulus']
return emg_data, glove_data, restimulus_data
def ensure_directory_exists(directory):
if not os.path.exists(directory):
os.makedirs(directory)
def runDataProcessing(user_list: list,
dataset_list: list,
size_val: int,
stride_val: int):
for user in user_list:
for dataset in dataset_list:
emg = emg_process(size_val, stride_val, user, dataset)
glove = glove_process(size_val, stride_val, user, dataset)
if not emg.shape[0] == glove.shape[0]:
raise ValueError("EMG and Glove data shape not aligned.")
# ----------- FILTERING ------------------------------#
def butter_lowpass(cutoff, fs, order):
nyq = 0.5*fs
normal_cutoff = cutoff/nyq
b, a = butter(order, normal_cutoff, btype='low', analog=False)
return b, a
def lowpass_filter(data, cutoff, fs, order):
b, a = butter_lowpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return y
def getProcessedData(user: int, dataset: int, mode: str):
"""
mode = ["glove", "emg"]
"""
data = np.load(f"./processed_data/user{user}/{mode}/dataset{dataset}/{mode}_{user}_{dataset}.npy")
return data
def slidingWindowEMG(x, win_size, win_stride):
# Perform the sliding window operation
num_windows = (len(x) - win_size) // win_stride + 1
windows = []
for i in range(num_windows):
start_index = i * win_stride
end_index = start_index + win_size
# Check if the end index goes beyond the array length
if end_index > len(x):
break # Break the loop if the window exceeds the array length
window = x[start_index:end_index]
windows.append(window)
# Convert the list of windows to a numpy array
windows_array = np.array(windows)
return windows_array # this returns an array where each element is a window of data -> for feature extraction
def slidingWindowGlove(x, win_size, win_stride):
num_windows = 1 + (len(x) - win_size) // win_stride
windows = []
for i in range(num_windows):
start_index = i * win_stride
end_index = start_index + win_size
end_segment_window = x[start_index:end_index] # not working
window_mean = np.mean(end_segment_window)
windows.append(window_mean)
# convert the list of windows to a numpy array
windows_array = np.array(windows) # this returns an array where each element is the final data point for that window
return windows_array
def WL(windows_array, win_size, win_stride):
windows_WL_list = []
for window in windows_array:
window_WL = 1/win_size * np.sum(np.abs(np.diff(window)))
windows_WL_list.append(window_WL)
windows_WL_array = np.array(windows_WL_list) # should be one dimensional (num_windows,)
return windows_WL_array
def LV(windows_array, win_size):
windows_LV_list = []
for window in windows_array:
# log smoothing needed
epsilon = 1e-15
window_LV = np.log10(np.var(window)+epsilon) #
windows_LV_list.append(window_LV)
windows_LV_array = np.array(windows_LV_list)
return windows_LV_array
def slidingWindowParameters(frequency, size_secs, stride_secs):
# NB: size and stride should be in Seconds (150ms = 150 * 10**(-3))
window_size = frequency * size_secs
window_stride = frequency * stride_secs
return window_size, window_stride
def emg_process(size_val, stride_val, user, dataset):
def ensure_directory_exists(directory):
if not os.path.exists(directory):
os.makedirs(directory)
fs = 2000
size, stride = slidingWindowParameters(2000, (size_val*10**(-3)), (stride_val*10**(-3)))
size = int(size)
stride = int(stride)
# 2. get the data: input is user and which dataset
emg_temp, glove_temp, stim_temp = getdata(user = user, dataset = dataset)
# 3. low-pass filter the EMG data and extract features
cutoff_val = 450
order = 2
for i in range(emg_temp.shape[1]):
emg_temp[:, i] = lowpass_filter(emg_temp[:, i], cutoff_val, fs, order)
del glove_temp, stim_temp
emg_windows = []
# Loop through each channelc
num_channelsEMG = emg_temp.shape[1]
# decide which features to be extracted:
WL_bool = True
LV_bool = True
# low pass and features ->
for i in range(num_channelsEMG):
feature_counter = 0
emg_window = slidingWindowEMG(emg_temp[:, i], size, stride)
if WL_bool:
emg_window_WL = WL(emg_window, size, stride)
emg_windows.append(emg_window_WL)
feature_counter += 1
if LV_bool:
emg_window_LV = LV(emg_window, size)
emg_windows.append(emg_window_LV)
feature_counter += 1
emg_windows_stacked = np.array(emg_windows)
emg_windows_stacked = emg_windows_stacked.T
# emg_windows_stacked = np.transpose(emg_windows_stacked)
# get into expected sklearn input X.shape should be [number_of_samples, number_of_features] for scaling
# check the shape is correct:
n_features = feature_counter * num_channelsEMG
if not emg_windows_stacked.shape[1] == n_features:
raise ValueError("Shape of EMG not correct for sklearn standardisation")
scaler = StandardScaler()
emg_windows_stacked = scaler.fit_transform(emg_windows_stacked)
directory = f'./processed_data/user{user}/emg/dataset{dataset}'
ensure_directory_exists(directory)
emg_data_processed = emg_windows_stacked
emg_data_ID = f"{user}_{dataset}"
np.save(f"{directory}/emg_{emg_data_ID}.npy", emg_data_processed)
return emg_data_processed
def glove_process(size_val, stride_val, user, dataset):
"""
The transformation matrix is from:
Krasoulis, Agamemnon, Sethu Vijayakumar, and Kianoush Nazarpour.
"Effect of user practice on prosthetic finger control with an
intuitive myoelectric decoder." Frontiers in neuroscience 13 (2019): 461612.
https://www.frontiersin.org/journals/neuroscience/articles/10.3389/fnins.2019.00891/full#supplementary-material
"""
def ensure_directory_exists(directory):
if not os.path.exists(directory):
os.makedirs(directory)
emg_temp, glove_temp, stim_temp = getdata(user = user, dataset = dataset)
del emg_temp, stim_temp
size, stride = slidingWindowParameters(2000, (size_val*10**(-3)), (stride_val*10**(-3)))
size = int(size)
stride = int(stride)
# perform sliding windows on the data
glove_data = []
for i in range(glove_temp.shape[1]):
glove_data.append(slidingWindowGlove(glove_temp[:, i], size, stride))
glove_data_array = np.array(glove_data).T
A_T = np.array([
[0.639, 0, 0, 0, 0],
[0.383, 0, 0, 0, 0],
[0, 1, 0, 0, 0],
[-0.639, 0, 0, 0, 0],
[0, 0, 0.4, 0 , 0],
[0, 0, 0.6, 0, 0],
[0, 0, 0, 0.4, 0],
[0, 0, 0, 0.6, 0],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0.1667],
[0, 0, 0, 0, 0.3333],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0.1667],
[0, 0, 0, 0, 0.3333],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0],
[-0.19, 0, 0, 0, 0],
[0, 0, 0, 0, 0],
])
A = A_T.T
glove_mapped = A @ glove_data_array.T
# get into expected sklearn input X.shape should be [number_of_samples, number_of_features] for scaling
glove_mapped = glove_mapped.T
# check that the shape is correct for sklearn scaling:
if not glove_mapped.shape[1] == 5:
raise ValueError("DoA is equal to 5, number of features for sklearn func should be 5. Check shape of data.")
# normalising all the DoA values between [0, 1]
scaler = MinMaxScaler()
glove_data_processed = scaler.fit_transform(glove_mapped)
directory = f'./processed_data/user{user}/glove/dataset{dataset}'
def ensure_directory_exists(directory):
if not os.path.exists(directory):
os.makedirs(directory)
ensure_directory_exists(directory)
glove_data_ID = f"{user}_{dataset}"
np.save(f"{directory}/glove_{glove_data_ID}.npy", glove_data_processed)
return glove_data_processed
### ------- EMG , GLOVE AND RESTIMULUS PROCESSING END ----- ###
def getMLP_pred(user: int,
dimred_type: str,
dimred_ID: str,
dataset: int):
data = np.load(f"./MLP_pred/user{user}/{dimred_type}/dataset{dataset}/{dimred_ID}.npy")
return data