diff --git a/pyOxygenSCPI/oxygenscpi.py b/pyOxygenSCPI/oxygenscpi.py index af85a60..15e574d 100644 --- a/pyOxygenSCPI/oxygenscpi.py +++ b/pyOxygenSCPI/oxygenscpi.py @@ -12,6 +12,7 @@ from struct import unpack from time import sleep from contextlib import contextmanager +from typing import Union, Literal log = logging.getLogger('oxygenscpi') @@ -622,7 +623,13 @@ def setElogTimestamp(self, tsType='REL'): ts_read = self._getElogTimestamp() return send_ok - def fetchElog(self, raw_string=True): + def fetchElog(self, + raw_string: bool = True + ) -> Union[ + list[list[str]], + list[Union[dt.datetime, float]], + Literal[False] + ]: data = self._askRaw(':ELOG:FETCH?') if type(data) is bytes: data = data.decode() @@ -645,7 +652,9 @@ def fetchElog(self, raw_string=True): data[i] = self._convertElogArray(row) return data - def _convertElogArray(self, data_array): + def _convertElogArray(self, + data_array: list[str] + ) -> list[Union[dt.datetime, float]]: """Converts a single array from fetchElog string values into float. If the Elog timestamp is set to 'ABS' then the first value of the array @@ -674,7 +683,12 @@ def _convertElogArray(self, data_array): new_array = [float(value) for value in data_array] return new_array - def fetchElogAccumulated(self, timeout=10): + def fetchElogAccumulated(self, + timeout: float = 10 + ) -> Union[ + list[list[Union[float, dt.datetime]]], + Literal[False] + ]: """Fetch ELOG until the actual timestamp is reached. This function blocks the execution and keeps fetching elog values until @@ -706,7 +720,7 @@ def fetchElogAccumulated(self, timeout=10): call_tstamp = dt.datetime.now() - def stopCondition(tstamp): + def stopCondition(tstamp) -> bool: """Checks if the measured timestamp has reached the call timestamp. """ # Case for ELOG timestamp