diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fcdfc49..e798d5d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,9 +15,8 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: '3.7' cache: 'pip' - cache-dependency-path: 'requirements.txt' - name: Install dependencies run: pip install -r requirements.txt - name: Analyzing the code with pylint diff --git a/.pylintrc b/.pylintrc index 18a5233..59bdbcd 100644 --- a/.pylintrc +++ b/.pylintrc @@ -17,7 +17,7 @@ extension-pkg-whitelist= fail-on= # Specify a score threshold to be exceeded before program exits with error. -fail-under=7.0 +fail-under=8.5 # Files or directories to be skipped. They should be base names, not paths. ignore=CVS diff --git a/pyOxygenSCPI/oxygenscpi.py b/pyOxygenSCPI/oxygenscpi.py index 15e574d..875458f 100644 --- a/pyOxygenSCPI/oxygenscpi.py +++ b/pyOxygenSCPI/oxygenscpi.py @@ -12,11 +12,11 @@ from struct import unpack from time import sleep from contextlib import contextmanager -from typing import Union, Literal +from typing import Optional, List, Tuple, Union log = logging.getLogger('oxygenscpi') -def is_minimum_version(version, min_version): +def is_minimum_version(version: Tuple[int,int], min_version: Tuple[int,int]): """ Performs a version check """ @@ -51,6 +51,9 @@ def __init__(self, ip_addr, tcp_port = 10001): self.ChannelProperties = OxygenChannelProperties(self) def connect(self): + """ + Connect to a running Oxygen instance + """ for numTry in range(1, self._CONN_NUM_TRY+1): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0) @@ -80,6 +83,9 @@ def connect(self): self._sock = sock def disconnect(self): + """ + Disconnect from Oxygen + """ try: self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() @@ -89,7 +95,7 @@ def disconnect(self): log.error("Error Shutting Down: %s", msg) self._sock = None - def _sendRaw(self, cmd): + def _sendRaw(self, cmd: str) -> bool: cmd += '\n' if self._sock is None: self.connect() @@ -104,7 +110,7 @@ def _sendRaw(self, cmd): log.error(template.format(msg)) return False - def _askRaw(self, cmd): + def _askRaw(self, cmd: str): cmd += '\n' if self._sock is None: self.connect() @@ -123,35 +129,38 @@ def _askRaw(self, cmd): log.error(template.format(msg)) return False - def getIdn(self): + def getIdn(self) -> Union[str, bool]: ret = self._askRaw('*IDN?') - if type(ret) == bytes: + if isinstance(ret, bytes): return ret.decode().strip() return False - def getVersion(self): + def getVersion(self) -> Optional[Tuple[int, int]]: """ SCPI,"1999.0",RC_SCPI,"1.6",OXYGEN,"2.5.71" """ ret = self._askRaw('*VER?') - if type(ret) == bytes: + if isinstance(ret, bytes): ret = ret.decode().strip().split(',') self._scpi_version = ret[3].replace('"','').split('.') self._scpi_version = (int(self._scpi_version[0]), int(self._scpi_version[1])) return self._scpi_version return None - def reset(self): + def reset(self) -> None: + """ + Resets the current SCPI session + """ self._sendRaw('*RST') - def headersOff(self): + def headersOff(self) -> None: """ Deactivate Headers on response """ self._sendRaw(':COMM:HEAD OFF') self._headersActive = False - def setRate(self, rate=500): + def setRate(self, rate: int = 500): """Sets the Aggregation Rate of the measurement device This Function sets the aggregation rate (mean value) to the @@ -163,9 +172,9 @@ def setRate(self, rate=500): Returns: Nothing """ - return self._sendRaw(':RATE {:d}ms'.format(rate)) + return self._sendRaw(f':RATE {rate:d}ms') - def loadSetup(self, setup_name): + def loadSetup(self, setup_name: str): """Loads the specified setup on the measurement device This Function loads the specified measurement setup (.dms) on @@ -177,9 +186,9 @@ def loadSetup(self, setup_name): Returns: Nothing """ - return self._sendRaw(':SETUP:LOAD "{:s}"'.format(setup_name)) + return self._sendRaw(f':SETUP:LOAD "{setup_name:s}"') - def _getTransferChannels(self, add_log=True): + def _getTransferChannels(self, add_log: bool = True) -> bool: """Reads the channels to be transferred within the numeric system. This function reads the actual list of channels to be transferred within @@ -201,7 +210,7 @@ def _getTransferChannels(self, add_log=True): channelNames = [chName.replace('"','') for chName in channelNames] if len(channelNames) == 1: if add_log: - log.debug('One Channel Set: {:s}'.format(channelNames[0])) + log.debug('One Channel Set: %s', channelNames[0]) if channelNames[0] == 'NONE': channelNames = [] if add_log: @@ -215,7 +224,12 @@ def _getTransferChannels(self, add_log=True): return True return False - def setTransferChannels(self, channelNames, includeRelTime=False, includeAbsTime=False): + def setTransferChannels( + self, + channelNames: List[str], + includeRelTime: bool = False, + includeAbsTime: bool = False + ): """Sets the channels to be transfered within the numeric system This Function sets the channels to be transfered. This list must @@ -232,41 +246,43 @@ def setTransferChannels(self, channelNames, includeRelTime=False, includeAbsTime if includeAbsTime: channelNames.insert(0, "ABS-TIME") channelListStr = '"'+'","'.join(channelNames)+'"' - self._sendRaw(':NUM:NORMAL:ITEMS {:s}'.format(channelListStr)) + self._sendRaw(f':NUM:NORMAL:ITEMS {channelListStr:s}') # Read back actual set channel names return self._getTransferChannels() - def setNumberChannels(self, number=None): + def setNumberChannels(self, number:Optional[int] = None): if number is None: number = len(self.channelList) - return self._sendRaw(':NUM:NORMAL:NUMBER {:d}'.format(number)) + return self._sendRaw(f':NUM:NORMAL:NUMBER {number:d}') class NumberFormat(Enum): ASCII = 0 BINARY_INTEL = 1 BINARY_MOTOROLA = 2 - def setNumberFormat(self, format=NumberFormat.ASCII): + def setNumberFormat(self, number_format=NumberFormat.ASCII): """ Set the number format of the output + Available since 1.20 """ if not is_minimum_version(self._scpi_version, (1,20)): raise NotImplementedError(":NUM:NORMAL:FORMAT requires protocol version 1.20") - if format == self.NumberFormat.BINARY_INTEL: + if number_format == self.NumberFormat.BINARY_INTEL: fmt = "BIN_INTEL" - elif format == self.NumberFormat.BINARY_MOTOROLA: + elif number_format == self.NumberFormat.BINARY_MOTOROLA: fmt = "BIN_MOTOROLA" else: fmt = "ASCII" - self._sendRaw(':NUM:NORMAL:FORMAT {:s}'.format(fmt)) - self._value_format = format # Cache value + self._sendRaw(f':NUM:NORMAL:FORMAT {fmt:s}') + self._value_format = number_format # Cache value def getNumberFormat(self) -> NumberFormat: """ Read the number format of the output + Available since 1.20 """ if not is_minimum_version(self._scpi_version, (1,20)): @@ -274,19 +290,21 @@ def getNumberFormat(self) -> NumberFormat: ret = self._askRaw(':NUM:NORM:FORMAT?') if isinstance(ret, bytes): - format = ret.decode() - if ' ' in format: - format = format.split(' ')[1].rstrip() - if format == "ASCII": + number_format = ret.decode() + if ' ' in number_format: + number_format = number_format.split(' ')[1].rstrip() + if number_format == "ASCII": return self.NumberFormat.ASCII - if format == "BIN_INTEL": + if number_format == "BIN_INTEL": return self.NumberFormat.BINARY_INTEL - if format == "BIN_MOTOROLA": + if number_format == "BIN_MOTOROLA": return self.NumberFormat.BINARY_MOTOROLA raise Exception("Invalid NumberFormat") def getValueDimensions(self): - """ Read the Dimension of the output + """ + Read the Dimension of the output + Available since 1.6 """ # Asking for command ":NUM:NORM:DIMS?" times out when there are no @@ -310,7 +328,7 @@ def getValueDimensions(self): def setValueMaxDimensions(self): if self.getValueDimensions(): for idx in range(len(self._value_dimension)): - self._sendRaw(':NUM:NORMAL:DIM{:d} MAX'.format(idx+1)) + self._sendRaw(f':NUM:NORMAL:DIM{idx+1:d} MAX') else: return False return self.getValueDimensions() @@ -363,7 +381,7 @@ def getValues(self): except OSError: return False - if not type(data) is bytes: + if not isinstance(data, bytes): # No Data Available or Wrong Channel return False @@ -409,7 +427,7 @@ def storeSetFileName(self, file_name): Status (bool) """ try: - return self._sendRaw(':STOR:FILE:NAME "{:s}"'.format(file_name)) + return self._sendRaw(f':STOR:FILE:NAME "{file_name:s}"') except OSError: return False @@ -514,6 +532,7 @@ def getAcquisitionState(self): if isinstance(ret, bytes): state = ret.decode().strip() return self.AcquisitionState(state) + return None def _getElogChannels(self, add_log=True): """Reads the channels to be transfered within the ELOG system. @@ -537,7 +556,7 @@ def _getElogChannels(self, add_log=True): channel_names = [ch_name.replace('"','') for ch_name in channel_names] if len(channel_names) == 1: if add_log: - log.debug('One Channel Set: {:s}'.format(channel_names[0])) + log.debug('One Channel Set: %s', channel_names[0]) if channel_names[0] == 'NONE': channel_names = [] if add_log: @@ -548,7 +567,7 @@ def _getElogChannels(self, add_log=True): return True return False - def setElogChannels(self, channel_names): + def setElogChannels(self, channel_names: List[str]): """Sets the channels to be transfered within the ELOG system This Function sets the channels to be transfered. This list must @@ -565,7 +584,7 @@ def setElogChannels(self, channel_names): return False channel_list_str = '"'+'","'.join(channel_names)+'"' - self._sendRaw(':ELOG:ITEMS {:s}'.format(channel_list_str)) + self._sendRaw(f':ELOG:ITEMS {channel_list_str:s}') sleep(0.1) # Read back actual set channel names return self._getElogChannels() @@ -574,8 +593,8 @@ def startElog(self): self._localElogStartTime = dt.datetime.now() return self._sendRaw(':ELOG:START') - def setElogPeriod(self, period): - return self._sendRaw(':ELOG:PERIOD {:f}'.format(period)) + def setElogPeriod(self, period: float): + return self._sendRaw(f':ELOG:PERIOD {period:f}') def stopElog(self): return self._sendRaw(':ELOG:STOP') @@ -626,15 +645,15 @@ def setElogTimestamp(self, tsType='REL'): def fetchElog(self, raw_string: bool = True ) -> Union[ - list[list[str]], - list[Union[dt.datetime, float]], - Literal[False] + List[List[str]], + List[Union[dt.datetime, float]], + bool ]: data = self._askRaw(':ELOG:FETCH?') - if type(data) is bytes: - data = data.decode() - else: + if not isinstance(data, bytes): return False + + data = data.decode() if any(d in data for d in ('NONE', 'ERROR')): return False # Remove Header if Whitespace present @@ -653,8 +672,8 @@ def fetchElog(self, return data def _convertElogArray(self, - data_array: list[str] - ) -> list[Union[dt.datetime, float]]: + data_array: List[str] + ) -> List[Union[dt.datetime, float]]: """Converts a single array from fetchElog string values into float. If the Elog timestamp is set to 'ABS' then the first value of the array @@ -686,8 +705,8 @@ def _convertElogArray(self, def fetchElogAccumulated(self, timeout: float = 10 ) -> Union[ - list[list[Union[float, dt.datetime]]], - Literal[False] + List[List[Union[float, dt.datetime]]], + bool ]: """Fetch ELOG until the actual timestamp is reached. @@ -752,14 +771,18 @@ def stopCondition(tstamp) -> bool: print("fetchElogAccumulated timed out.") return False - def addMarker(self, label, description=None, time=None): + def addMarker( + self, + label: str, + description:Optional[str]=None, + time:Optional[float]=None): if description is None and time is None: - return self._sendRaw(':MARK:ADD "{:s}"'.format(label)) + return self._sendRaw(f':MARK:ADD "{label:s}"') if description is None: - return self._sendRaw(':MARK:ADD "{:s}",{:f}'.format(label, time)) + return self._sendRaw(f':MARK:ADD "{label:s}",{time:f}') if time is None: - return self._sendRaw(':MARK:ADD "{:s}","{:s}"'.format(label, description)) - return self._sendRaw(':MARK:ADD "{:s}","{:s}",{:f}'.format(label, description, time)) + return self._sendRaw(f':MARK:ADD "{label:s}","{description:s}"') + return self._sendRaw(f':MARK:ADD "{label:s}","{description:s}",{time:f}') def getChannelList(self): ret = self._askRaw(':CHANNEL:NAMES?') @@ -769,14 +792,14 @@ def getChannelList(self): return ch_list return None - def getChannelListDict(self, key="ChannelName"): + def getChannelListDict(self, key:str="ChannelName"): ch_list = self.getChannelList() if ch_list: ch_dict = {} for ch in ch_list: if key == "ChannelName": if ch[1] in ch_dict: - print("Warning: Channel duplicate detected!") + log.warning("Warning: Channel duplicate detected!") ch_dict[ch[1]] = ch[0] else: ch_dict[ch[0]] = ch[1] @@ -784,70 +807,68 @@ def getChannelListDict(self, key="ChannelName"): return None - def getChannelPropValue(self, channel_id, prop): - if type(channel_id) == int: + def getChannelPropValue(self, channel_id:Union[str, int], prop:str): + if isinstance(channel_id, int): channel_id = str(channel_id) ret = self._askRaw(f':CHANNEL:PROP? "{channel_id:s}","{prop:s}"') if ret: return ret.decode().strip() return None - def getChannelPropNames(self, channel_id): - if type(channel_id) == int: + def getChannelPropNames(self, channel_id:Union[str, int]): + if isinstance(channel_id, int): channel_id = str(channel_id) ret = self._askRaw(f':CHANNEL:ITEM{channel_id:s}:ATTR:NAMES?') if ret: return ret.decode().strip().replace('"','').split(",") return None - def setChannelPropValue(self, channel_id, prop, val): - if type(channel_id) == int: + def setChannelPropValue(self, channel_id:Union[str, int], prop:str, val:str): + if isinstance(channel_id, int): channel_id = str(channel_id) self._sendRaw(f':CHANNEL:PROP "{channel_id:s}","{prop:s}","{val:s}"') # TODO: Better add and remove data stream instances class OxygenScpiDataStream: - def __init__(self, oxygen): + def __init__(self, oxygen: OxygenSCPI): self.oxygen = oxygen + self.ChannelList = [] - def setItems(self, channelNames, streamGroup=1): + def setItems(self, channelNames: List[str], streamGroup=1): """ Set Datastream Items to be transfered """ if not is_minimum_version(self.oxygen._scpi_version, (1,7)): - log.warn('SCPI Version 1.7 or higher required') + log.warning('SCPI Version 1.7 or higher required') return False channelListStr = '"'+'","'.join(channelNames)+'"' - ret = self.oxygen._sendRaw(':DST:ITEM{:d} {:s}'.format(streamGroup, channelListStr)) + ret = self.oxygen._sendRaw(f':DST:ITEM{streamGroup:d} {channelListStr:s}') sleep(0.1) # Read back actual set channel names - ret = self.oxygen._askRaw(':DST:ITEM{:d}?'.format(streamGroup)) + ret = self.oxygen._askRaw(f':DST:ITEM{streamGroup:d}?') if isinstance(ret, bytes): ret = ret.decode().strip() - ret = ret.replace(':DST:ITEM{:d} '.format(streamGroup),'') + ret = ret.replace(f':DST:ITEM{streamGroup:d} ','') channelNames = ret.split('","') channelNames = [chName.replace('"','') for chName in channelNames] if len(channelNames) == 1: - log.debug('One Channel Set: {:s}'.format(channelNames[0])) + log.debug('One Channel Set: %s', channelNames[0]) if channelNames[0] == 'NONE': channelNames = [] - log.warn('No Channel Set') + log.warning('No Channel Set') self.ChannelList = channelNames - if len(channelNames) == 0: - return False - - return True + return len(channelNames) > 0 return False - def setTcpPort(self, tcp_port, streamGroup=1): - self.oxygen._sendRaw(':DST:PORT{:d} {:d}'.format(streamGroup, tcp_port)) + def setTcpPort(self, tcp_port:int, streamGroup:int=1): + self.oxygen._sendRaw(f':DST:PORT{streamGroup:d} {tcp_port:d}') return True - def init(self, streamGroup=1): + def init(self, streamGroup: Union[str, int]=1): if streamGroup == 'all': - self.oxygen._sendRaw(':DST:INIT {:s}'.format(streamGroup)) - elif type(streamGroup) == int: - self.oxygen._sendRaw(':DST:INIT {:d}'.format(streamGroup)) + self.oxygen._sendRaw(f':DST:INIT {streamGroup:s}') + elif isinstance(streamGroup, int): + self.oxygen._sendRaw(f':DST:INIT {streamGroup:d}') else: return False return True @@ -855,8 +876,8 @@ def init(self, streamGroup=1): def start(self, streamGroup=1): if streamGroup == 'all': self.oxygen._sendRaw(':DST:START ALL') - elif type(streamGroup) == int: - self.oxygen._sendRaw(':DST:START {:d}'.format(streamGroup)) + elif isinstance(streamGroup, int): + self.oxygen._sendRaw(f':DST:START {streamGroup:d}') else: return False return True @@ -864,14 +885,14 @@ def start(self, streamGroup=1): def stop(self, streamGroup=1): if streamGroup == 'all': self.oxygen._sendRaw(':DST:STOP ALL') - elif type(streamGroup) == int: - self.oxygen._sendRaw(':DST:STOP {:d}'.format(streamGroup)) + elif isinstance(streamGroup, int): + self.oxygen._sendRaw(f':DST:STOP {streamGroup:d}') else: return False return True def getState(self, streamGroup=1): - ret = self.oxygen._askRaw(':DST:STAT{:d}?'.format(streamGroup)) + ret = self.oxygen._askRaw(f':DST:STAT{streamGroup:d}?') if isinstance(ret, bytes): ret = ret.decode().strip() ret = ret.replace(':DST:STAT ','') @@ -880,9 +901,9 @@ def getState(self, streamGroup=1): def setTriggered(self, streamGroup=1, value=True): if value: - self.oxygen._sendRaw(':DST:TRIG{:d} ON'.format(streamGroup)) + self.oxygen._sendRaw(f':DST:TRIG{streamGroup:d} ON') else: - self.oxygen._sendRaw(':DST:TRIG{:d} OFF'.format(streamGroup)) + self.oxygen._sendRaw(f':DST:TRIG{streamGroup:d} OFF') def reset(self): self.oxygen._sendRaw(':DST:RESET') diff --git a/setup.py b/setup.py index 3130374..8a2590e 100644 --- a/setup.py +++ b/setup.py @@ -32,5 +32,5 @@ packages=["pyOxygenSCPI"], package_dir={"pyOxygenSCPI": "pyOxygenSCPI"}, install_requires=[], - python_requires=">=3.6", + python_requires=">=3.7", )