-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
179 lines (141 loc) · 5.77 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import numpy as np
import os
import pandas as pd
import math
# TODO
def sdf(start, stop, spk=None, g_size=20, n_first=None, step=50):
"""Compute sdf for each neurons inside an interval"""
if n_first is not None:
neurons = np.unique(spk[:, 0])[:200]
else:
neurons = np.unique(spk[:, 0])
spk_first = spk[(spk[:, 1] >= start - step) & (spk[:, 1] < stop + step)]
spk_first[:, 1] -= start - step
dur = stop - start + 2 * step
sdf_full = np.empty([len(neurons), int(dur)])
sdf = []
for neu in range(len(neurons)):
spike_times_first = spk_first[spk_first[:, 0] == neurons[neu], 1]
for t in range(int(dur)):
tau_first = t - spike_times_first
sdf_full[neu, t] = sum(
1
/ (math.sqrt(2 * math.pi) * g_size)
* np.exp(-np.power(tau_first, 2) / (2 * (g_size**2)))
) * (10**3)
sdf.append(sdf_full[neu])
return sdf
def sdf_mean(sdf):
sdf_mean = np.mean(sdf, axis=0)
return sdf_mean
def sdf_maf(sdf, step=100):
sdf_maf = np.convolve(sdf_mean(sdf), np.ones(step), "valid") / step
return sdf_maf
def sdf_change(sdf_cells):
sdf_change_trial = []
for neuron in range(len(sdf_cells)):
baseline_sdf = sdf_cells[neuron][100:200]
avg_baseline_sdf = np.mean(baseline_sdf)
current_sdf_change = (
np.sum(sdf_cells[neuron][250:300] - avg_baseline_sdf) / 50 # assume sdf compute for the all trial lentgh
)
sdf_change_trial.append(current_sdf_change)
return sdf_change_trial
# nNOS stimulus poisson spike-train pattern
def homogeneous_poisson(rate, start, stop, bin_size, dim):
tmax = stop - start
spikes = np.zeros(dim)
nbins = np.floor(tmax / bin_size).astype(int)
prob_of_spike = rate * bin_size
s = np.random.rand(nbins) < prob_of_spike
spikes[start:stop] = s
return spikes
def homogeneous_poisson_for_nest(rate, tmax, bin_size):
nbins = np.floor(tmax / bin_size).astype(int)
prob_of_spike = rate * bin_size
spikes = np.random.rand(nbins) < prob_of_spike
return spikes
def regular_spikes(start, stop, dt, input_rate, dim):
# nbins = np.floor((stop-start)/dt).astype(int)
spikes = np.zeros(dim)
time_stamps = np.arange(start / dt, stop / dt, 1000 / input_rate / dt, dtype=int)
spikes[time_stamps] = 1
return spikes
def CS_pattern(PG_input, input_rate, start_CS, stop_CS, bin_size):
# CS spiking pattern
CS_pattern = []
CS_id = []
for index, mf in enumerate(PG_input):
nbins = np.floor(((stop_CS - start_CS) / 1000) / bin_size).astype(int)
prob_of_spike = input_rate * bin_size
CS_spikes = np.random.rand(nbins) < prob_of_spike
t = np.arange(len(CS_spikes)) * bin_size
CS_time_stamps = t[CS_spikes] * 1000 + start_CS
CS_id = np.append(CS_id, np.ones(len(CS_time_stamps)) * mf)
CS_pattern = np.append(CS_pattern, CS_time_stamps)
CS_stimulus = np.zeros((len(CS_pattern), 2))
CS_stimulus[:, 0] = CS_id
CS_stimulus[:, 1] = CS_pattern
return CS_stimulus
def US_pattern(PG_error, error_rate, start_US, stop_US):
# US spiking pattern
US_pattern = []
US_id = []
US_time_stamps = np.arange(start_US, stop_US, 1000 / error_rate)
for index, cf in enumerate(PG_error):
US_id = np.append(US_id, np.ones(len(US_time_stamps)) * cf)
US_pattern = np.append(US_pattern, US_time_stamps)
US_stimulus = np.zeros((len(US_pattern), 2))
US_stimulus[:, 0] = US_id
US_stimulus[:, 1] = US_pattern
return US_stimulus
def sig(x, A=2, B=170, C=5):
return A / (1 + np.exp(-(x - B) / C))
def get_spike_activity(cell_name, path=""):
# print('Reading:',cell_name)
if path == "":
pthDat = "./"
else:
pthDat = path
files = [f for f in os.listdir(pthDat) if os.path.isfile(os.path.join(pthDat, f))]
ID_cell = []
time_cell = []
for f in files:
if f.startswith(cell_name):
cell_f = open(pthDat + f, "r").read()
cell_f = cell_f.split("\n")
for i in range(len(cell_f) - 1):
splitted_string = cell_f[i].split("\t")
ID_cell.append(float(splitted_string[0]))
time_cell.append(float(splitted_string[1]))
sources_activity_df = pd.DataFrame({"source_id": ID_cell, "spike_time": time_cell})
neurons_activity = np.array([ID_cell, time_cell])
return neurons_activity.T
def get_spike_values(nest, sd_list, pop_names):
"""Function to select spike idxs and times from spike_det events
Returns a list of dictionaries with spikes and times"""
dic_list = []
for sd, name in zip(sd_list, pop_names):
spikes = nest.GetStatus(sd, "events")[0]["senders"]
times = nest.GetStatus(sd, "events")[0]["times"]
dic = {"times": times, "neurons_idx": spikes, "compartment_name": name}
dic_list = dic_list + [dic]
return dic_list
def get_weights_values(nest, weights_recorder):
"""Function to select mean voltage and time from voltmeter events
Returns a list of dictionaries with potentials and times"""
dic_list = []
weights = nest.GetStatus(weights_recorder, "events")[0]["weights"]
times = nest.GetStatus(weights_recorder, "events")[0]["times"]
senders = nest.GetStatus(weights_recorder, "events")[0]["senders"]
targets = nest.GetStatus(weights_recorder, "events")[0]["targets"]
# for s_i, t_i in zip([67838, 22216, 80039], [95457, 95457, 95525]):
for s_i, t_i in zip([7714, 19132], [95514, 95473]):
idx = [s == s_i and t == t_i for s, t in zip(senders, targets)]
dic = {
"times": times[idx],
"weights": weights[idx],
"sender_receiver": f"s = {s_i}, t = {t_i}",
}
dic_list = dic_list + [dic]
return dic_list