-
Notifications
You must be signed in to change notification settings - Fork 0
/
ICL_feature_extractor.py
90 lines (71 loc) · 3.18 KB
/
ICL_feature_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import numpy as np
from topoplot import topoplot
from eeg_rpsd import eeg_rpsd
from eeg_autocorr_welch import eeg_autocorr_welch
from eeg_autocorr import eeg_autocorr
from eeg_autocorr_fftw import eeg_autocorr_fftw
from pop_reref import pop_reref
def ICL_feature_extractor(EEG, flag_autocorr=False):
# Check inputs
ncomp = EEG['icawinv'].shape[1]
# Check for ICA key and if it is not empty
if not 'icawinv' in EEG.keys() or EEG['icawinv'].size == 0:
raise ValueError('You must have an ICA decomposition to use ICLabel')
# Assuming chanlocs are correct
if EEG['ref'] != 'average' and EEG['ref'] != 'averef':
EEG = pop_reref(EEG, []) #, exclude=list(set(range(EEG.nbchan)) - set(EEG.icachansind)))
# raise ValueError('Data must be rereferenced to average to use ICLabel')
# EEG = pop_reref(EEG, [], exclude=list(set(range(EEG.nbchan)) - set(EEG.icachansind)))
# Calculate ICA activations if missing and cast to double
if EEG['icaact'] is None:
raise ValueError('You must have ICA activations to use ICLabel')
# EEG['icaact'] = eeg_getica(EEG)
EEG['icaact'] = EEG['icaact'].astype(float)
# Check ICA is real
assert np.isreal(EEG['icaact']).all(), 'Your ICA decomposition must be real to use ICLabel'
# Calculate topo
topo = np.zeros((32, 32, 1, ncomp))
for it in range(ncomp):
_, temp_topo, _, _, _ = topoplot(EEG['icawinv'][:, it], EEG['chanlocs'][EEG['icachansind']-1], noplot='on')
temp_topo[np.isnan(temp_topo)] = 0
topo[:, :, 0, it] = temp_topo / np.max(np.abs(temp_topo))
# Cast
topo = topo.astype(np.float32)
# Calculate PSD
psd = eeg_rpsd(EEG, 100)
# Extrapolate or prune as needed
nfreq = psd.shape[1]
if nfreq < 100:
psd = np.hstack((psd, np.tile(psd[:, -1][:, np.newaxis], (1, 100 - nfreq))))
# Undo notch filter
for linenoise_ind in [50, 60]:
linenoise_around = [linenoise_ind - 1, linenoise_ind + 1]
difference = psd[:, linenoise_around] - psd[:, linenoise_ind][:, np.newaxis]
notch_ind = np.all(difference > 5, axis=1)
if np.any(notch_ind):
psd[notch_ind, linenoise_ind] = np.mean(psd[notch_ind][:, linenoise_around], axis=1)
# Normalize
psd = psd / np.max(np.abs(psd), axis=1)[:, np.newaxis]
psd = np.transpose(np.expand_dims(np.expand_dims(psd, axis=-1), axis=-1), (2, 1, 3, 0)).astype(np.float32)
# Calculate autocorrelation
if flag_autocorr:
if EEG['trials'] == 1:
if EEG['pnts'] / EEG['srate'] > 5:
autocorr = eeg_autocorr_welch(EEG)
else:
autocorr = eeg_autocorr(EEG)
else:
autocorr = eeg_autocorr_fftw(EEG)
# Reshape and cast
autocorr = np.transpose(np.expand_dims(np.expand_dims(autocorr, axis=-1), axis=-1), (2, 1, 3, 0)).astype(np.float32)
# Format outputs
if flag_autocorr:
features = [0.99 * topo, 0.99 * psd, 0.99 * autocorr]
else:
features = [0.99 * topo, 0.99 * psd]
return features
def test_ICL_feature_extractor():
flag_autocorr = True
EEG = EEG2
EEG['ref'] = 'averef'
EEG['chanlocs'] = np.random.randn(32, 3)