-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextraction_data_medpc_Luscher.py
346 lines (260 loc) · 11.3 KB
/
extraction_data_medpc_Luscher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# -*- coding: utf-8 -*-
"""
Created on Tue Dec 13
Functions to analyse social operant conditioning
@author: redon
Variables:
\ R = Lever Active assignmnet (1=left lever active, 2=Right lever active)
\ A = Active lever press - total session count
\ B = Intermediate Active lever press
\ G = Active lever press - count during time out
\ I = Inactive lever press
\ C = Inactive lever press count during time out
\ F = Fixed Ratio
\ D = Drug available (0) or time out (1)
\ M = Minutes (')
\ S = Seconds (")
\ T =
\ U = Time stamping counter for array data
\ O =
\ N =
\ L =
\ J = Laser counter (In SA protocol)
\ Q =
\ H = 20Hz
\ P = 1'stim
"""
import pandas as pd
import numpy as np
import re
import datetime
import pathlib
import pdb
import glob
import os
def ethovision_reader(file):
'''
Read ethovision raw csv into separate dataframe for header and raw values
Parameters:
----------------
file: String
Path to the ethovision raw csv
Returns:
----------------
df_header: Dataframe
Contains all experiment information (2 columns: name of information, value)
df_ethovision: Dataframe
Contains the raw value of the variables' state at each 4ms
'''
# Determine the number of rows in the header
nrows = pd.read_csv(file, header=None, usecols=[0, 1], nrows=1).at[0, 1]
# Extract the header into a dataframe
df_header = pd.read_csv(file, header=None, index_col=0, usecols=[0, 1], nrows=(nrows - 2))
# Extract the session data into a dataframe
df_ethovision = pd.read_csv(file, skiprows=(nrows - 2), header=0, na_values='-').drop(0).astype('float')
return df_header, df_ethovision
def start_time_med(med_file):
time_regex_med = r"Start Time: (\d{2}:\d{2}:\d{2})"
date_regex_med = r"Start Date: (\d{2}/\d{2}/\d{2})"
with open(med_file) as f:
lines = f.readlines()
for line in lines:
match_time = re.search(time_regex_med, line)
match_date = re.search(date_regex_med, line)
if match_time:
start_time = match_time.group(1)
# start_time = datetime.datetime.strptime(match_time.group(1), '%H:%M:%S')
if match_date:
start_date = match_date.group(1)
start_med_str = start_date + ' ' + start_time
start_med_time = datetime.datetime.strptime(start_med_str, '%m/%d/%y %H:%M:%S')
return start_med_time
def detect_floats(sequence):
'''Function computing medassociate file data by detecting list of numbers in non-organized txt file
Arg:
sequence = chunk of non organized txt file containing several float to extract
type Str
Returns
list_floats = List containing each single floats detected in the sequence
type List of floats
'''
# Pattern to match any decimal number whatever the size
pattern = re.compile(
r'[0-9]+\.?[0-9]+') # any several number ([0-9]+) followed by a period or not(\.?), and followed by several number
# Find each single decimal number in the sequence
list_floats_str = re.findall(pattern, sequence)
# Convert the list of string into list of floats to allow future computation
list_floats = [float(x) for x in list_floats_str]
return list_floats
def import_txt(file):
"""Function allowing to (i) import medpc file into a dataframe
and to (ii) convert columns of data into arrays of floats for computation
WARNING: use detect_floats()
Arg:
file = path or name of the file to open, must be txt or csv-like
type Str
Return:
array_results = array version of df_medpc_all
type Array
df_medpc_all = 2 columns dataframe containing experimental data
type DataFrame
"""
df_medpc_all = pd.read_csv(file,
sep=':', # Allow to have 2 column but will raise error on dates
skiprows=[0],
on_bad_lines='warn',
# Allow to overcome error with dates by simply warning when one is encountered
header=None).fillna('empty')
# Converting the dataframe into array
array_results = df_medpc_all.values
for i in range(len(array_results)):
array_results[i, 1] = detect_floats(array_results[i, 1])
return array_results, df_medpc_all
def extract_arrays(array_result):
"""Function iterating through arrays to extract and organize the all data
into a dictionary with the variable's name (key) and its content (value)
Arg:
array_results = array containing data from the dataframe
Return:
dict_result = dictionnary gathering variable names (keys) and values in an array (value)
type Dict
list_variables = list of variable names a string
type List of Str
list_values = list of variable values in list
type List of lists
"""
# Initiate empty lists to host data
list_variables = []
list_values = []
# Define a regex that would match any uppercase letter
letter = re.compile(r'[A-Z]')
varList=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']
# Iterate through the row of the array:
for j in range(len(array_result)):
# If column 0 element at index j and j+1 are capital letters
if re.match(letter, array_result[j, 0]) != None and re.match(letter, array_result[j + 1, 0]) != None:
# Add the variable name (element of index j in column 0) to list_variables
list_variables.append(re.match(letter, array_result[j, 0]).group())
# Add its value (element of index j in column 1) to list_values
list_values.append(array_result[j, 1])
# If column 0 element is a capital letter and the following is not
elif re.match(letter, array_result[j, 0]) != None and re.match(letter, array_result[j + 1, 0]) == None:
# Add the variable name (element of index j in column 0) to list_variables
list_variables.append(re.match(letter, array_result[j, 0]).group())
# Add its value (element of index j in column 1) to list_values to initiate the array
list_values.append(array_result[j, 1])
# If column 0 element is not a capital letter
elif re.match(letter, array_result[j, 0]) == None:
# Append its value (element of index j in column 1) to the last element of list_values
list_values[-1] = list_values[-1] + array_result[j, 1]
# Create the dictionnary with all extracted data: name as keys and values as values
zip_iterator = zip(list_variables, list_values)
dict_result = dict(zip_iterator)
# Iterate through element within the dictionary:
for key, value in dict_result.items():
# If no value is found, set to nan
if len(value) == 0:
dict_result[key] = 0
# if only one element is present in the value, extract it from the value list
elif len(value) == 1:
dict_result[key] = value[0]
return dict_result, list_variables, list_values
def find_closest_breakpoint(FRList, actNb):
# Initialize the closest value and the minimum difference found so far
closest_value = None
min_diff = float('inf')
# Iterate over each number in the FRList
for num in FRList:
# Calculate the absolute difference between the current number and actNb
diff = abs(num - actNb)
# If this difference is smaller than the minimum difference found so far
if diff < min_diff:
# Update the closest value and the minimum difference
closest_value = num
min_diff = diff
# If we found a number exactly equal to actNb, we can stop searching
elif diff == 0:
break
return closest_value
# import numpy as np
# import re
# from datetime import datetime
#
# def detect_data_type(value):
# # Attempt to convert to float
# try:
# return float(value)
# except ValueError:
# # Check if it's a sequence of floats
# if all(re.match(r"^\d+:\s+([\d.]+\s*)+$", line) for line in value.split('\n')):
# sequence = []
# for line in value.split('\n'):
# parts = line.split(':', 1)
# if len(parts) == 2:
# numbers = [float(num) for num in parts[1].split()]
# sequence.extend(numbers)
# return np.array(sequence)
# # Default to string if no other type matches
# return value.strip()
#
# def import_txt_as_dict(file_path):
# results = {}
# current_key = None
# buffer = ""
#
# with open(file_path, 'r') as file:
# for line in file:
# if ':' in line:
# if current_key is not None:
# results[current_key] = detect_data_type(buffer)
# split_line = line.split(':', 1)
# current_key = split_line[0].strip().replace(' ', '_').lower()
# buffer = split_line[1].strip()
# else:
# buffer += "\n" + line.strip()
# # For the last key-value pair in the file
# if current_key is not None:
# results[current_key] = detect_data_type(buffer)
#
# return results
import numpy as np
import re
def parse_sequence(buffer):
"""Parse a buffer containing a sequence into a dictionary of numpy arrays, with improved checking."""
sequence = {}
for line in buffer.strip().split('\n'):
# Check if the line matches the expected numeric sequence format
if re.match(r"^\s*\d+:\s+([\d.]+(\s+[\d.]+)*)$", line):
index, nums_str = line.split(':', 1)
nums = np.array([float(num) for num in nums_str.split()])
sequence[index.strip()] = nums
return sequence
def parse_value(value):
"""Determine and convert the value to the appropriate type with refined logic."""
# Check the entire buffer for sequence pattern to reduce false positives
if re.search(r"^\s*\d+:\s+([\d.]+(\s+[\d.]+)*)$", value, re.MULTILINE):
return parse_sequence(value)
else:
try:
return float(value)
except ValueError:
return value.strip()
def import_txt_as_dict(file_path):
results = {}
current_key, buffer = None, ""
with open(file_path, 'r') as file:
for line in file:
if ':' in line and not re.match(r"^\s*\d+:\s+[\d.]+", line):
if current_key is not None:
results[current_key] = parse_value(buffer)
current_key, buffer = line.split(':', 1)
current_key = current_key.strip().replace(' ', '_').lower()
buffer = buffer.strip()
else:
buffer += '\n' + line.strip()
if current_key is not None:
results[current_key] = parse_value(buffer)
return results
# Example usage:
# file_path = "path_to_your_file.txt"
# results = import_txt_as_dict(file_path)