diff --git a/.github/workflows/pylint.yml b/.github/workflows/ci.yml similarity index 78% rename from .github/workflows/pylint.yml rename to .github/workflows/ci.yml index c9cd281..36eb98f 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/ci.yml @@ -17,9 +17,13 @@ jobs: with: python-version: '3.10' cache: 'pip' + cache-dependency-path: 'requirements.txt' - name: Install dependencies run: | pip install -r requirements.txt - name: Analysing the code with pylint run: | pylint pyOxygenSCPI + - name: Running unit tests with pytest + run: | + pytest diff --git a/.pylintrc b/.pylintrc index ea43e97..18a5233 100644 --- a/.pylintrc +++ b/.pylintrc @@ -17,7 +17,7 @@ extension-pkg-whitelist= fail-on= # Specify a score threshold to be exceeded before program exits with error. -fail-under=10.0 +fail-under=7.0 # Files or directories to be skipped. They should be base names, not paths. ignore=CVS diff --git a/pyOxygenSCPI/oxygenscpi.py b/pyOxygenSCPI/oxygenscpi.py index 5534939..86cc928 100644 --- a/pyOxygenSCPI/oxygenscpi.py +++ b/pyOxygenSCPI/oxygenscpi.py @@ -227,7 +227,7 @@ def setNumberFormat(self, format=NumberFormat.ASCII): Available since 1.20 """ if not is_minimum_version(self._scpi_version, (1,20)): - raise NotImplementedError(":NUM:NORMAL:FORMAT requires protocol version 1.20"); + raise NotImplementedError(":NUM:NORMAL:FORMAT requires protocol version 1.20") if format == self.NumberFormat.BINARY_INTEL: fmt = "BIN_INTEL" @@ -245,7 +245,7 @@ def getNumberFormat(self) -> NumberFormat: Available since 1.20 """ if not is_minimum_version(self._scpi_version, (1,20)): - raise NotImplementedError(":NUM:NORMAL:FORMAT? requires protocol version 1.20"); + raise NotImplementedError(":NUM:NORMAL:FORMAT? requires protocol version 1.20") ret = self._askRaw(':NUM:NORM:FORMAT?') if isinstance(ret, bytes): @@ -254,9 +254,9 @@ def getNumberFormat(self) -> NumberFormat: format = format.split(' ')[1].rstrip() if format == "ASCII": return self.NumberFormat.ASCII - elif format == "BIN_INTEL": + if format == "BIN_INTEL": return self.NumberFormat.BINARY_INTEL - elif format == "BIN_MOTOROLA": + if format == "BIN_MOTOROLA": return self.NumberFormat.BINARY_MOTOROLA raise Exception("Invalid NumberFormat") @@ -341,7 +341,7 @@ def getValues(self): # Remove Header if Whitespace present if data.startswith(b':NUM:VAL '): data = data[9:] - + # Remove trailing newline if len(data) > 1 and data[-1] == ord('\n'): data = data[0:-1] @@ -351,7 +351,7 @@ def getValues(self): data = self._get_value_from_binary(data) else: data = self._get_value_from_ascii(data) - + if self._value_dimension is not None: idx = 0 values = [] @@ -365,7 +365,7 @@ def getValues(self): values.append(data[idx:idx+dim]) idx += dim return values - + return data def storeSetFileName(self, file_name): @@ -565,15 +565,14 @@ def addMarker(self, label, description=None, time=None): if time is None: return self._sendRaw(':MARK:ADD "{:s}","{:s}"'.format(label, description)) return self._sendRaw(':MARK:ADD "{:s}","{:s}",{:f}'.format(label, description, time)) - + def getChannelList(self): ret = self._askRaw(':CHANNEL:NAMES?') if ret: ch_str_list = ret.decode().strip() ch_list = [item.replace('(','').replace(')','').replace('"','').split(',') for item in ch_str_list.split('),(')] return ch_list - else: - return None + return None def getChannelListDict(self, key="ChannelName"): ch_list = self.getChannelList() @@ -587,38 +586,35 @@ def getChannelListDict(self, key="ChannelName"): else: ch_dict[ch[0]] = ch[1] return ch_dict - else: - return None - + + return None + def getChannelPropValue(self, channel_id, prop): if type(channel_id) == int: channel_id = str(channel_id) ret = self._askRaw(f':CHANNEL:PROP? "{channel_id:s}","{prop:s}"') if ret: return ret.decode().strip() - else: - return None - + return None + def getChannelPropNames(self, channel_id): if type(channel_id) == int: channel_id = str(channel_id) ret = self._askRaw(f':CHANNEL:ITEM{channel_id:s}:ATTR:NAMES?') if ret: return ret.decode().strip().replace('"','').split(",") - else: - return None - + return None + def setChannelPropValue(self, channel_id, prop, val): if type(channel_id) == int: channel_id = str(channel_id) self._sendRaw(f':CHANNEL:PROP "{channel_id:s}","{prop:s}","{val:s}"') - -# TODO: Better add and remove data stream instances -class OxygenScpiDataStream(object): +# TODO: Better add and remove data stream instances +class OxygenScpiDataStream: def __init__(self, oxygen): self.oxygen = oxygen - + def setItems(self, channelNames, streamGroup=1): """ Set Datastream Items to be transfered """ @@ -643,15 +639,15 @@ def setItems(self, channelNames, streamGroup=1): self.ChannelList = channelNames if len(channelNames) == 0: return False - else: - return True - else: - return False - + + return True + + return False + def setTcpPort(self, tcp_port, streamGroup=1): self.oxygen._sendRaw(':DST:PORT{:d} {:d}'.format(streamGroup, tcp_port)) return True - + def init(self, streamGroup=1): if streamGroup == 'all': self.oxygen._sendRaw(':DST:INIT {:s}'.format(streamGroup)) @@ -660,7 +656,7 @@ def init(self, streamGroup=1): else: return False return True - + def start(self, streamGroup=1): if streamGroup == 'all': self.oxygen._sendRaw(':DST:START ALL') @@ -669,7 +665,7 @@ def start(self, streamGroup=1): else: return False return True - + def stop(self, streamGroup=1): if streamGroup == 'all': self.oxygen._sendRaw(':DST:STOP ALL') @@ -678,26 +674,25 @@ def stop(self, streamGroup=1): else: return False return True - + def getState(self, streamGroup=1): ret = self.oxygen._askRaw(':DST:STAT{:d}?'.format(streamGroup)) if isinstance(ret, bytes): ret = ret.decode().strip() ret = ret.replace(':DST:STAT ','') return ret - else: - return False - + return False + def setTriggered(self, streamGroup=1, value=True): if value: self.oxygen._sendRaw(':DST:TRIG{:d} ON'.format(streamGroup)) else: self.oxygen._sendRaw(':DST:TRIG{:d} OFF'.format(streamGroup)) - + def reset(self): self.oxygen._sendRaw(':DST:RESET') -class OxygenChannelProperties(object): +class OxygenChannelProperties: class OutputMode(Enum): FUNCTION_GENERATOR = "FunctionGenerator" CONSTANT = "ConstOutput" @@ -717,58 +712,58 @@ def getChannelSamplerate(self, ch_id): try: return float(self.oxygen.getChannelPropValue(ch_id, 'SampleRate').split(',')[1].replace(')','')) except: - return None + return None def getTrionSlotNumber(self, ch_id): try: return int(self.oxygen.getChannelPropValue(ch_id, 'ID:TRION/SlotNumber').split(',')[1].replace(')','')) except: - return None + return None def getTrionBoardId(self, ch_id): try: return int(self.oxygen.getChannelPropValue(ch_id, 'ID:TRION/BoardId').split(',')[1].replace(')','')) except: - return None + return None def getTrionChannelIndex(self, ch_id): try: return int(self.oxygen.getChannelPropValue(ch_id, 'ID:TRION/ChannelIndex').split(',')[1].replace(')','')) except: - return None + return None def getChannelDomainName(self, ch_id): try: return self.oxygen.getChannelPropValue(ch_id, 'Neon/DomainUrl').split(',')[1].replace(')','').strip('"') except: return "" - + def getChannelLPFilterFreq(self, ch_id): """ - Possible Values for ret: + Possible Values for ret: - NONE - (SCALAR,20000.0,"Hz") - (STRING,"Auto") - (STRING,"Off") """ - try: + try: ret = self.oxygen.getChannelPropValue(ch_id, 'LP_Filter_Freq') if ret == "NONE": return None ret_parts = ret.replace("(","").replace(")","").split(",") if ret_parts[0] == "STRING": return ret_parts[1].replace('"',"") - elif ret_parts[0] == "SCALAR": + if ret_parts[0] == "SCALAR": return float(ret_parts[1]) except: - return None + pass + return None def getChannelUsed(self, ch_id): ret = self.oxygen.getChannelPropValue(ch_id, 'Used').split(',')[1].strip(')"') if ret == "OFF": return False - else: - return True + return True def getChannelRange(self, ch_id): try: @@ -800,18 +795,18 @@ def getTrionLpFilterDelay(self, ch_id): except: ret = 0.0 return ret - + def setTrionOutputFgenAmplitude(self, ch_id, amplitude, unit="V", amplitude_type="RMS"): self.oxygen.setChannelPropValue(ch_id, "AmplitudeValue", amplitude_type) self.oxygen.setChannelPropValue(ch_id, "TRION/Amplitude", f"{amplitude:f} {unit:s}") - def setTrionOutputFgenOffset(self, ch_id, offset, unit="V"): + def setTrionOutputFgenOffset(self, ch_id, offset, unit: str="V"): self.oxygen.setChannelPropValue(ch_id, "TRION/Offset", f"{offset:f} {unit:s}") - + def setTrionOutputFgenFrequency(self, ch_id, frequency): self.oxygen.setChannelPropValue(ch_id, "TRION/Frequency", f"{frequency:f} Hz") - def setTrionOutputResolution(self, ch_id, resolution): + def setTrionOutputResolution(self, ch_id, resolution: str): """ resolution, str, 'HighSpeed' or 'HighResolution' """ @@ -822,4 +817,4 @@ def setTrionOutputConstant(self, ch_id, amplitude, unit="V", const_idx=0): self.oxygen.setChannelPropValue(ch_id, f"TRION/SourceChannel_A_CONST/Const{const_idx:d}", f"{amplitude:f} {unit:s}") def setTrionOutputFgenWaveform(self, ch_id, waveform: Waveform): - self.oxygen.setChannelPropValue(ch_id, "TRION/WaveForm", waveform.value) \ No newline at end of file + self.oxygen.setChannelPropValue(ch_id, "TRION/WaveForm", waveform.value) diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..21e4970 --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,15 @@ +""" +Copyright DEWETRON GmbH 2023 + +pyOxygenSCPI - Unit Tests +""" +import pytest +from pyOxygenSCPI import OxygenSCPI + +def test_construction(): + o = OxygenSCPI('127.0.0.1') + assert len(o.channelList) == 0 + assert len(o.elogChannelList) == 0 + assert o.DataStream is not None + assert o.ChannelProperties is not None + assert o.getVersion() is None