From e9203cdc91492384dfee1c97c0745296338f9851 Mon Sep 17 00:00:00 2001 From: dsa <65232477+dantonsa@users.noreply.github.com> Date: Tue, 5 Sep 2023 10:39:30 +0200 Subject: [PATCH] Useful updates for future improvement (#2) * Included reading of transfer and elog channels at __init__ to update respective attributes with channels previously set at Oxygen * Included elogContext method to use elog start and stop in a with statement * Included missing ELOG timestamp for setElogTimestamp * Included _localElogStartTime attribute to track locally when Elog started * Included False return for fetchElog in case of error fetched * Fixed getTransferChannels timing out when there are no channels available * Removed unecessary logging when class is initialized by adding the "add_log" flag at _getTransferChannels and _getElogChannels. * Fixed fetchElog method splitting array unaccordingly when timestamp is set to OFF. Included attribute that stores the elog timestamp. * Included _convertElogArray method to convert matrix of strings from elog fetch into matrix of float (or datetime) values. * Included fetchElogAccumulated * Included elogTimestamp attribute at __init__ * Included error handling for elog example in case of timeout. * Increased continuous fetching waiting time for elogFetchAccumulate to 50ms and fixed timeout docstring. * Fixed fetchElogAccumulated not raising Exception for invalid timestamp configurations. * Included type annotations for some Elog functions. --------- Co-authored-by: Matthias Straka <59084281+matthiasstraka@users.noreply.github.com> --- oxygenscpi_example.py | 37 +++++ pyOxygenSCPI/oxygenscpi.py | 297 ++++++++++++++++++++++++++++++------- 2 files changed, 283 insertions(+), 51 deletions(-) diff --git a/oxygenscpi_example.py b/oxygenscpi_example.py index e85bfbf..f35f140 100644 --- a/oxygenscpi_example.py +++ b/oxygenscpi_example.py @@ -47,4 +47,41 @@ mDevice.storeStop() print("Recording stopped.") +# Set Elog channels and fetch some elog data +mDevice.setElogChannels(["AI 4/I1","AI 4/I2"]) +mDevice.setElogTimestamp('ABS') +mDevice.setElogPeriod(0.01) +print("Starting Elog") +mDevice.startElog() +print("Waiting for 10 seconds...") +time.sleep(10) +print("Fetching elog Data") +data1 = mDevice.fetchElog() +data2 = mDevice.fetchElog() +mDevice.stopElog() +print("Elog stopped.") +if data1: + print(f"First Elog row values fetched: {data1[0]}") +else: + print("Unable to fetch Elog.") +if data2: + print(f"Last Elog row values fetched: {data2[-1]}") +else: + print("Unable to fetch Elog.") + +# Alternatively Start elog with context manager and accumulating values +# for 10 seconds +print("Starting Elog with context manager") +mDevice.setElogTimestamp('ELOG') +with mDevice.elogContext(): + print("Waiting for 10 seconds...") + time.sleep(10) + print("Fetching Elog") + data = mDevice.fetchElogAccumulated() +print("Elog stopped.") +if data: + print(f"Fetched Elog from timestamp {data[0][0]} to {data[-1][0]}s.") +else: + print("Unable to fetch Elog.") + mDevice.disconnect() diff --git a/pyOxygenSCPI/oxygenscpi.py b/pyOxygenSCPI/oxygenscpi.py index 86cc928..15e574d 100644 --- a/pyOxygenSCPI/oxygenscpi.py +++ b/pyOxygenSCPI/oxygenscpi.py @@ -11,6 +11,8 @@ from enum import Enum from struct import unpack from time import sleep +from contextlib import contextmanager +from typing import Union, Literal log = logging.getLogger('oxygenscpi') @@ -43,6 +45,8 @@ def __init__(self, ip_addr, tcp_port = 10001): self._value_dimension = None self._value_format = self.NumberFormat.ASCII self.elogChannelList = [] + self.elogTimestamp = "OFF" + self._localElogStartTime = dt.datetime.now() self.DataStream = OxygenScpiDataStream(self) self.ChannelProperties = OxygenChannelProperties(self) @@ -57,6 +61,9 @@ def connect(self): self._sock = sock self.headersOff() self.getVersion() + self._getTransferChannels(False) + self._getElogChannels(False) + self._getElogTimestamp() return True except ConnectionRefusedError as msg: template = "Connection to {!s}:{:d} refused: {!s}" @@ -172,25 +179,20 @@ def loadSetup(self, setup_name): """ return self._sendRaw(':SETUP:LOAD "{:s}"'.format(setup_name)) - def setTransferChannels(self, channelNames, includeRelTime=False, includeAbsTime=False): - """Sets the channels to be transfered within the numeric system + def _getTransferChannels(self, add_log=True): + """Reads the channels to be transferred within the numeric system. - This Function sets the channels to be transfered. This list must - contain Oxygen channel names. + This function reads the actual list of channels to be transferred within + the numeric system and updates the attribute 'channelNames' with a list + of strings containing the actual channel names. It is called at + __init__ to get the previously set channels. Args: - channelNames (list of str): List of channel names + add_log (bool): Indicate in function should log or not. Returns: True if Suceeded, False if not """ - if includeRelTime: - channelNames.insert(0, "REL-TIME") - if includeAbsTime: - channelNames.insert(0, "ABS-TIME") - channelListStr = '"'+'","'.join(channelNames)+'"' - ret = self._sendRaw(':NUM:NORMAL:ITEMS {:s}'.format(channelListStr)) - # Read back actual set channel names ret = self._askRaw(':NUM:NORMAL:ITEMS?') if isinstance(ret, bytes): ret = ret.decode().strip() @@ -198,19 +200,42 @@ def setTransferChannels(self, channelNames, includeRelTime=False, includeAbsTime channelNames = ret.split('","') channelNames = [chName.replace('"','') for chName in channelNames] if len(channelNames) == 1: - log.debug('One Channel Set: {:s}'.format(channelNames[0])) + if add_log: + log.debug('One Channel Set: {:s}'.format(channelNames[0])) if channelNames[0] == 'NONE': channelNames = [] - log.warning('No Channel Set') + if add_log: + log.warning('No Channel Set') self.channelList = channelNames ret = self.setNumberChannels() if not ret: return False - if is_minimum_version(self._scpi_version, (1,6)): + if is_minimum_version(self._scpi_version, (1,6)) and channelNames: return self.getValueDimensions() return True return False + def setTransferChannels(self, channelNames, includeRelTime=False, includeAbsTime=False): + """Sets the channels to be transfered within the numeric system + + This Function sets the channels to be transfered. This list must + contain Oxygen channel names. + + Args: + channelNames (list of str): List of channel names + + Returns: + True if Suceeded, False if not + """ + if includeRelTime: + channelNames.insert(0, "REL-TIME") + if includeAbsTime: + channelNames.insert(0, "ABS-TIME") + channelListStr = '"'+'","'.join(channelNames)+'"' + self._sendRaw(':NUM:NORMAL:ITEMS {:s}'.format(channelListStr)) + # Read back actual set channel names + return self._getTransferChannels() + def setNumberChannels(self, number=None): if number is None: number = len(self.channelList) @@ -264,18 +289,22 @@ def getValueDimensions(self): """ Read the Dimension of the output Available since 1.6 """ - ret = self._askRaw(':NUM:NORM:DIMS?') - if isinstance(ret, bytes): - dim = ret.decode() - if ' ' in dim: - dim = dim.split(' ')[1] - dim = dim.split(',') - try: - self._value_dimension = [int(d) for d in dim] - except TypeError: - self._value_dimension = False - return False - return True + # Asking for command ":NUM:NORM:DIMS?" times out when there are no + # transfer channels selected. + if self.channelList: + ret = self._askRaw(':NUM:NORM:DIMS?') + if isinstance(ret, bytes): + dim = ret.decode() + if ' ' in dim: + dim = dim.split(' ')[1] + dim = dim.split(',') + try: + self._value_dimension = [int(d) for d in dim] + except TypeError: + self._value_dimension = False + return False + return True + return False return False def setValueMaxDimensions(self): @@ -486,26 +515,20 @@ def getAcquisitionState(self): state = ret.decode().strip() return self.AcquisitionState(state) - def setElogChannels(self, channel_names): - """Sets the channels to be transfered within the ELOG system + def _getElogChannels(self, add_log=True): + """Reads the channels to be transfered within the ELOG system. - This Function sets the channels to be transfered. This list must - contain Oxygen channel names. + This function reads the actual list of channels to be transferred within + the ELOG system and updates the attribute 'elogChannelList' with a list + of strings containing the actual elog channel names. It is called at + __init__ to get the previously set channels. Args: - channelNames (list of str): List of channel names + add_log (bool): Indicate in function should log or not. Returns: True if Suceeded, False if not """ - if not is_minimum_version(self._scpi_version, (1,7)): - log.warning('SCPI Version 1.7 or higher required') - return False - - channel_list_str = '"'+'","'.join(channel_names)+'"' - ret = self._sendRaw(':ELOG:ITEMS {:s}'.format(channel_list_str)) - sleep(0.1) - # Read back actual set channel names ret = self._askRaw(':ELOG:ITEMS?') if isinstance(ret, bytes): ret = ret.decode().strip() @@ -513,17 +536,42 @@ def setElogChannels(self, channel_names): channel_names = ret.split('","') channel_names = [ch_name.replace('"','') for ch_name in channel_names] if len(channel_names) == 1: - log.debug('One Channel Set: {:s}'.format(channel_names[0])) + if add_log: + log.debug('One Channel Set: {:s}'.format(channel_names[0])) if channel_names[0] == 'NONE': channel_names = [] - log.warning('No Channel Set') + if add_log: + log.warning('No Channel Set') self.elogChannelList = channel_names if len(channel_names) == 0: return False return True return False + def setElogChannels(self, channel_names): + """Sets the channels to be transfered within the ELOG system + + This Function sets the channels to be transfered. This list must + contain Oxygen channel names. + + Args: + channelNames (list of str): List of channel names + + Returns: + True if Suceeded, False if not + """ + if not is_minimum_version(self._scpi_version, (1,7)): + log.warning('SCPI Version 1.7 or higher required') + return False + + channel_list_str = '"'+'","'.join(channel_names)+'"' + self._sendRaw(':ELOG:ITEMS {:s}'.format(channel_list_str)) + sleep(0.1) + # Read back actual set channel names + return self._getElogChannels() + def startElog(self): + self._localElogStartTime = dt.datetime.now() return self._sendRaw(':ELOG:START') def setElogPeriod(self, period): @@ -532,31 +580,178 @@ def setElogPeriod(self, period): def stopElog(self): return self._sendRaw(':ELOG:STOP') - def setElogTimestamp(self, tsType='REL'): - if tsType == 'REL': - return self._sendRaw(':ELOG:TIM REL') - if tsType == 'ABS': - return self._sendRaw(':ELOG:TIM ABS') - return self._sendRaw(':ELOG:TIM OFF') + @contextmanager + def elogContext(self): + """Safely starts and stops external logging. + + This function should be used in a with statement to start external + logging and immediately stops it when either exiting the context + or when an Exception occurs within the context. + + Example usage: + with mDevice.startElog(): + # Here elog is started + time.sleep(10) + data = mDevice.fetchElog() + # Here elog is stopped + """ + try: + self.startElog() + yield + finally: + self.stopElog() - def fetchElog(self): + def _getElogTimestamp(self): + """Get external logging configured timestamp. + + Returns: + External logging timestamp string obtained from ':ELOG:TIM?' + """ + ret = self._askRaw(':ELOG:TIM?') + if isinstance(ret, bytes): + ret = ret.decode().strip() + self.elogTimestamp = ret + return ret + return False + + def setElogTimestamp(self, tsType='REL'): + if tsType in ('REL', 'ABS', 'ELOG'): + self._sendRaw(f':ELOG:TIM {tsType}') + ts_read = self._getElogTimestamp() + return ts_read == tsType + send_ok = self._sendRaw(':ELOG:TIM OFF') + ts_read = self._getElogTimestamp() + return send_ok + + def fetchElog(self, + raw_string: bool = True + ) -> Union[ + list[list[str]], + list[Union[dt.datetime, float]], + Literal[False] + ]: data = self._askRaw(':ELOG:FETCH?') if type(data) is bytes: data = data.decode() else: return False - if 'NONE' in data: + if any(d in data for d in ('NONE', 'ERROR')): return False # Remove Header if Whitespace present if ' ' in data: data = data.split(' ')[1] data = data.split(',') - num_ch = len(self.elogChannelList)+1 + num_ch = len(self.elogChannelList) + if self.elogTimestamp in ('REL', 'ABS', 'ELOG'): + num_ch += 1 #print(len(data)/(1.0*num_ch), data) num_items = int(len(data)/num_ch) data = [data[i*num_ch:i*num_ch+num_ch] for i in range(num_items)] + if not raw_string: + for i, row in enumerate(data): + data[i] = self._convertElogArray(row) return data + def _convertElogArray(self, + data_array: list[str] + ) -> list[Union[dt.datetime, float]]: + """Converts a single array from fetchElog string values into float. + + If the Elog timestamp is set to 'ABS' then the first value of the array + is converted into datetime object. + + Args: + data_array : list of strings + List containing single array of string measurements from fetchElog. + + Returns: + List with value types converted to float (datetime for value at + index 0 if timestamp is ABS). + """ + # When ELOG timestamp is set to ABS, the first value of a row is a + # datetime string and the remaining values are strings that can + # be converted to floating point numbers. + if self.elogTimestamp == "ABS": + new_array = [] + dtime = data_array[0].replace('"', '') + # Oxygen's datetime with whole seconds does not display microseconds + # for those cases the ".%f" formatting is excluded. + fmt = '%Y-%m-%dT%H:%M:%S.%f' if '.' in dtime else '%Y-%m-%dT%H:%M:%S' + new_array.append(dt.datetime.strptime(dtime, fmt)) + new_array.extend(float(value) for value in data_array[1:]) + else: + new_array = [float(value) for value in data_array] + return new_array + + def fetchElogAccumulated(self, + timeout: float = 10 + ) -> Union[ + list[list[Union[float, dt.datetime]]], + Literal[False] + ]: + """Fetch ELOG until the actual timestamp is reached. + + This function blocks the execution and keeps fetching elog values until + a fetched timestamp is higher than the timestamp saved at the moment in + which the function was called. If called right after starting the + external logging, it is possible that the function needs longer because + the update of the Dewetron's buffer is not instantaneous. + + Depending on the internet connection, the function itself can take a few + seconds to be excecuted. + + It requires the elog timestamp to be either 'ABS' or 'ELOG'. + - With 'ABS' timestamp, the stop condition will compare the absolute + timestamp from the system executing the function with the timestamp from + the operating system in which Oxygen is running. + - With 'ELOG' timestamp, the stop condition will be met when the fetched + timestamp added to the timestamp in which the startElog function was + called (tracked by _localElogStartTime) attribute is higher than the + timestamp from execution of the function. + + Args: + timeout (float): timeout for the accumulated fetching in seconds. + + Returns: + List of lists (matrix like) containing the accumulated fetched values + converted to float (datetime for values at first column if timestamp + is ABS.) + """ + + call_tstamp = dt.datetime.now() + + def stopCondition(tstamp) -> bool: + """Checks if the measured timestamp has reached the call timestamp. + """ + # Case for ELOG timestamp + if self.elogTimestamp == "ELOG": + tstamp = float(tstamp) + return self._localElogStartTime + dt.timedelta(seconds=tstamp) >= call_tstamp + # Case for ABS timestmap + tstamp = tstamp.replace('"','') + tstamp = dt.datetime.strptime(tstamp, '%Y-%m-%dT%H:%M:%S.%f') + return tstamp >= call_tstamp + + # This function works only for ELOG and ABS timestamps + if self.elogTimestamp not in ("ELOG", "ABS"): + raise Exception("fetchElogAccumulated is only allowed for " + "'ELOG' and 'ABS' timestamp configuration.") + combined_fetch = [] + while dt.datetime.now() - call_tstamp < dt.timedelta(seconds=timeout): + data = self.fetchElog() + # Keep fetching until data is received + if not data: + sleep(0.05) + continue + combined_fetch.extend(data) + # Check if last fetched value reaches the call timestamp. + if stopCondition(combined_fetch[-1][0]): + for i, row in enumerate(combined_fetch): + combined_fetch[i] = self._convertElogArray(row) + return combined_fetch + print("fetchElogAccumulated timed out.") + return False + def addMarker(self, label, description=None, time=None): if description is None and time is None: return self._sendRaw(':MARK:ADD "{:s}"'.format(label))