Skip to content

Commit

Permalink
Added code for fixes for lsl and changed from processthread to process
Browse files Browse the repository at this point in the history
  • Loading branch information
milosobral committed May 17, 2023
1 parent 096dad3 commit 0333746
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 55 deletions.
109 changes: 54 additions & 55 deletions portiloop/src/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import ABC, abstractmethod
import json
from multiprocessing import Process, Queue, Value
import os
from time import sleep
import time
Expand Down Expand Up @@ -108,78 +109,80 @@ def capture_process(p_data_o, p_msg_io, duration, frequency, python_clock, time_
def start_capture(
detector_cls,
stimulator_cls,
capture_object,
capture_dictionary,
q_msg,
pause_value
):
print(capture_object.channel_states)
print(capture_dictionary['channel_states'])

# Initialize data frontend
fake_filename = RECORDING_PATH / 'test_recording.csv'
capture_frontend = ADSFrontend(
duration=capture_object.duration,
frequency=capture_object.frequency,
python_clock=capture_object.python_clock,
channel_states=capture_object.channel_states,
duration=capture_dictionary['duration'],
frequency=capture_dictionary['frequency'],
python_clock=capture_dictionary['python_clock'],
channel_states=capture_dictionary['channel_states'],
process=capture_process,
) if capture_object.signal_input == "ADS" else FileFrontend(fake_filename, capture_object.nb_channels, capture_object.channel_detection)
) if capture_dictionary['signal_input'] == "ADS" else FileFrontend(fake_filename, capture_dictionary['nb_channels'], capture_dictionary['channel_detection'])

# Initialize detector, LSL streamer and stimulatorif requested
detector = detector_cls(capture_object.threshold, channel=capture_object.channel_detection) if detector_cls is not None else None
detector = detector_cls(capture_dictionary['threshold'], channel=capture_dictionary['channel_detection']) if detector_cls is not None else None
streams = {
'filtered': filter,
'markers': detector is not None,
}
print(PORTILOOP_ID)
lsl_streamer = LSLStreamer(streams, capture_object.nb_channels, capture_object.frequency, id=PORTILOOP_ID) if capture_object.lsl else Dummy()
lsl_streamer = LSLStreamer(streams, capture_dictionary['nb_channels'], capture_dictionary['frequency'], id=PORTILOOP_ID) if capture_dictionary['lsl'] else Dummy()
stimulator = stimulator_cls(lsl_streamer=lsl_streamer) if stimulator_cls is not None else None

# Initialize filtering pipeline
if filter:
fp = FilterPipeline(nb_channels=capture_object.nb_channels,
sampling_rate=capture_object.frequency,
power_line_fq=capture_object.filter_settings['power_line'],
use_custom_fir=capture_object.filter_settings['custom_fir'],
custom_fir_order=capture_object.filter_settings['custom_fir_order'],
custom_fir_cutoff=capture_object.filter_settings['custom_fir_cutoff'],
alpha_avg=capture_object.filter_settings['polyak_mean'],
alpha_std=capture_object.filter_settings['polyak_std'],
epsilon=capture_object.filter_settings['epsilon'],
filter_args=capture_object.filter_settings['filter_args'])
fp = FilterPipeline(nb_channels=capture_dictionary['nb_channels'],
sampling_rate=capture_dictionary['frequency'],
power_line_fq=capture_dictionary['filter_settings']['power_line'],
use_custom_fir=capture_dictionary['filter_settings']['custom_fir'],
custom_fir_order=capture_dictionary['filter_settings']['custom_fir_order'],
custom_fir_cutoff=capture_dictionary['filter_settings']['custom_fir_cutoff'],
alpha_avg=capture_dictionary['filter_settings']['polyak_mean'],
alpha_std=capture_dictionary['filter_settings']['polyak_std'],
epsilon=capture_dictionary['filter_settings']['epsilon'],
filter_args=capture_dictionary['filter_settings']['filter_args'])

# Launch the capture process
capture_frontend.init_capture()

# Initialize display if requested
live_disp = LiveDisplay(channel_names=capture_object.signal_labels, window_len=capture_object.width_display) if capture_object.display else Dummy()
live_disp = LiveDisplay(channel_names=capture_dictionary['signal_labels'], window_len=capture_dictionary['width_display']) if capture_dictionary['display'] else Dummy()

# Initialize recording if requested
recorder = EDFRecorder(capture_object.signal_labels, capture_object.filename, capture_object.frequency) if capture_object.record else Dummy()
recorder = EDFRecorder(capture_dictionary['signal_labels'], capture_dictionary['filename'], capture_dictionary['frequency']) if capture_dictionary['record'] else Dummy()
recorder.open_recording_file()

# Buffer used for the visualization and the recording
buffer = []

# Initialize stimulation delayer if requested
delay = not ((capture_object.stim_delay == 0.0) and (capture_object.inter_stim_delay == 0.0)) and (stimulator is not None)
delay_phase = (not delay) and (not capture_object.spindle_detection_mode == 'Fast') and (stimulator is not None)
delay = not ((capture_dictionary['stim_delay'] == 0.0) and (capture_dictionary['inter_stim_delay'] == 0.0)) and (stimulator is not None)
delay_phase = (not delay) and (not capture_dictionary['spindle_detection_mode'] == 'Fast') and (stimulator is not None)
if delay:
stimulation_delayer = TimingDelayer(
stimulation_delay=capture_object.stim_delay,
inter_stim_delay=capture_object.inter_stim_delay
stimulation_delay=capture_dictionary['stim_delay'],
inter_stim_delay=capture_dictionary['inter_stim_delay']
)
elif delay_phase:
stimulation_delayer = UpStateDelayer(
capture_object.frequency,
capture_object.spindle_detection_mode == 'Peak', 0.3)
capture_dictionary['frequency'],
capture_dictionary['spindle_detection_mode'] == 'Peak', 0.3)
else:
stimulation_delayer = Dummy()

if stimulator is not None:
stimulator.add_delayer(stimulation_delayer)

# Get the metadata and save it to a file
metadata = capture_object.get_metadata()
metadata = capture_dictionary
# Split the original path into its components
dirname, basename = os.path.split(capture_object.filename)
dirname, basename = os.path.split(capture_dictionary['filename'])
# Split the file name into its name and extension components
name, _ = os.path.splitext(basename)
# Define the new file name
Expand All @@ -191,21 +194,22 @@ def start_capture(
json.dump(metadata, f, indent=4)

# Initialize the variable to keep track of whether we are in a detection state or not for the markers
with capture_object._pause_detect_lock:
prev_pause = capture_object._pause_detect
prev_pause = pause_value.value

if detector is not None:
marker_str = LSLStreamer.string_for_detection_activation(prev_pause)
lsl_streamer.push_marker(marker_str)

# Main capture loop
while True:
# First, we send all outgoing messages to the capture process
with capture_object._lock_msg_out:
if capture_object._msg_out is not None:
capture_frontend.send_msg(capture_object._msg_out)
capture_object._msg_out = None

# First, we send all outgoing messages to the capture process
try:
msg = q_msg.get_nowait()
capture_frontend.send_msg(msg)
except:
pass

# Then, we check if we have received a message from the capture process
msg = capture_frontend.get_msg()
# Either we have received a stop message, or a print message.
Expand Down Expand Up @@ -238,8 +242,7 @@ def start_capture(
lsl_streamer.push_filtered(filtered_point[-1])

# Check if detection is on or off
with capture_object._pause_detect_lock:
pause = capture_object._pause_detect
pause = pause_value.value

# If the state has changed since last iteration, we send a marker
if pause != prev_pause and detector is not None:
Expand All @@ -256,20 +259,22 @@ def start_capture(
stimulator.stimulate(detection_signal)

# Adds point to buffer for delayed stimulation
stimulation_delayer.step(filtered_point[0][capture_object.channel_detection-1])
stimulation_delayer.step(filtered_point[0][capture_dictionary['channel_detection'] - 1])

# Add point to the buffer to send to viz and recorder
buffer += filtered_point
if len(buffer) >= 50:
live_disp.add_datapoints(buffer)
recorder.add_recording_data(buffer)
buffer = []

# close the frontend
capture_frontend.close()
recorder.close_recording_file()
if stimulator is not None:
stimulator.close()

del lsl_streamer
del stimulation_delayer
del stimulator
del detector
Expand Down Expand Up @@ -314,9 +319,9 @@ def __init__(self, detector_cls=None, stimulator_cls=None):
self.python_clock = True

# Communication parameters for messages with capture
self._lock_msg_out = Lock()
self._msg_out = None
self._t_capture = None
self.q_msg = Queue()
self.pause_value = Value('b', True)

# Channel parameters
self.signal_labels = [f"ch{i+1}" for i in range(self.nb_channels)]
Expand All @@ -332,10 +337,7 @@ def __init__(self, detector_cls=None, stimulator_cls=None):
# Stimulator and detector classes
self.detector_cls = detector_cls
self.stimulator_cls = stimulator_cls

# Pause detection
self._pause_detect_lock = Lock()
self._pause_detect = True


if ADS:
try:
Expand Down Expand Up @@ -832,8 +834,6 @@ def on_b_capture(self, value):
self.frequency = to_ads_frequency(self.frequency)
self.b_frequency.value = self.frequency
self.display_buttons()
with self._lock_msg_out:
self._msg_out = None
if self._t_capture is not None:
warnings.warn("Capture already running, operation aborted.")
return
Expand All @@ -853,14 +853,15 @@ def on_b_capture(self, value):

self.width_display = 5 * self.frequency # Display 5 seconds of signal

self._t_capture = Thread(target=start_capture,
self._t_capture = Process(target=start_capture,
args=(detector_cls,
stimulator_cls,
self))
self.get_metadata(),
self.q_msg,
self.pause_value,))
self._t_capture.start()
elif val == 'Stop':
with self._lock_msg_out:
self._msg_out = 'STOP'
self.q_msg.put('STOP')
assert self._t_capture is not None
self._t_capture.join()
self._t_capture = None
Expand Down Expand Up @@ -1010,11 +1011,9 @@ def on_b_test_impedance(self, b):
def on_b_pause(self, value):
val = value['new']
if val == 'Active':
with self._pause_detect_lock:
self._pause_detect = False
self.pause_value = False
elif val == 'Paused':
with self._pause_detect_lock:
self._pause_detect = True
self.pause_value = True

def on_b_delay(self, value):
val = value['new']
Expand Down
8 changes: 8 additions & 0 deletions portiloop/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,14 @@ def push_raw(self, data):
def push_marker(self, text):
self.lsl_outlet_markers.push_sample([text])

def __del__(self):
print("Closing LSL streams")
self.lsl_outlet_raw.__del__()
if self.streams['filtered']:
self.lsl_outlet.__del__()
if self.streams['markers']:
self.lsl_outlet_markers.__del__()

@staticmethod
def string_for_detection_activation(pause):
return "DETECT_OFF" if pause else "DETECT_ON"

0 comments on commit 0333746

Please sign in to comment.