diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a9123c8..ccd9b69 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,4 @@ -name: Pylint +name: Integration tests on: push: @@ -24,7 +24,7 @@ jobs: - name: Install dependencies run: pip install -r requirements.txt - name: Analyzing the code with pylint - run: pylint pyOxygenSCPI + run: pylint pyOxygenSCPI examples - name: Installing the package run: python3 setup.py install - name: Running unit tests with pytest diff --git a/examples/read_elog.py b/examples/read_elog.py new file mode 100644 index 0000000..ac45ac5 --- /dev/null +++ b/examples/read_elog.py @@ -0,0 +1,39 @@ +""" +Determinine all available channels and return values of the first two channels via ELOG + +The ELOG subsystem allows the SCPI user to retrieve synchronized access to multiple channels +via statistics calculations. First, the subsystem needs to be configured. Possible parameters +are a channel list, aggregation calculations, aggregation duration as well as result formats +and timestamp formats. After configuration, the user can start the computations and fetch all +values. By continuously requesting data records, gap-less readout is possible. Data is kept +available inside Oxygen for at least 20 seconds before fetching of old samples is no longer +possible. +""" + +import time +from pyOxygenSCPI import OxygenSCPI + +DEWETRON_IP_ADDR = 'localhost' +DEWETRON_PORT = 10001 + +mDevice = OxygenSCPI(ip_addr=DEWETRON_IP_ADDR, tcp_port=DEWETRON_PORT) + +print("Available channels:") +channel_list = mDevice.getChannelList() +for ch_id, ch_name in channel_list: + print(f" {ch_name:20} (id = {ch_id})") + +# Select the first two channels +selection = [ch_name for _, ch_name in channel_list[0:2]] +mDevice.setElogChannels(selection) +mDevice.setElogTimestamp('REL') +mDevice.setElogCalculations(['RMS', 'MIN']) +mDevice.setElogPeriod(0.1) +print(f'{"Time":5} {selection[0]:>11} RMS {selection[0]:>11} MIN {selection[1]:>11} RMS {selection[1]:>11} MIN') +with mDevice.elogContext(): + for n in range(4): + time.sleep(1) + data = mDevice.fetchElog(raw_string=False) + if data: + for values in data: + print(f'{values[0]:5} {values[1]:>15.3} {values[2]:>15.3} {values[3]:>15.3} {values[4]:>15.3}') diff --git a/pyOxygenSCPI/oxygenscpi.py b/pyOxygenSCPI/oxygenscpi.py index e80bbbf..99d31ed 100644 --- a/pyOxygenSCPI/oxygenscpi.py +++ b/pyOxygenSCPI/oxygenscpi.py @@ -46,6 +46,7 @@ def __init__(self, ip_addr, tcp_port = 10001): self._value_format = self.NumberFormat.ASCII self.elogChannelList = [] self.elogTimestamp = "OFF" + self.elogCalculations = [] self._localElogStartTime = dt.datetime.now() self.DataStream = OxygenScpiDataStream(self) self.ChannelProperties = OxygenChannelProperties(self) @@ -67,6 +68,7 @@ def connect(self): self._getTransferChannels(False) self._getElogChannels(False) self._getElogTimestamp() + self._getElogCalculations() return True except ConnectionRefusedError as msg: template = "Connection to {!s}:{:d} refused: {!s}" @@ -643,23 +645,64 @@ def _getElogTimestamp(self): return ret return False - def setElogTimestamp(self, tsType='REL'): - if tsType in ('REL', 'ABS', 'ELOG'): - self._sendRaw(f':ELOG:TIM {tsType}') - ts_read = self._getElogTimestamp() - return ts_read == tsType - send_ok = self._sendRaw(':ELOG:TIM OFF') + def setElogTimestamp(self, tsType: str='REL'): + """ + Sets the requested timestamp format + + possible values for tsType are: 'REL', 'ABS', 'ELOG' or 'OFF' + """ + if tsType not in ('REL', 'ABS', 'ELOG', 'OFF'): + raise ValueError("Possible ELOG timestamp types are: 'REL', 'ABS', 'ELOG' or 'OFF'") + self._sendRaw(f':ELOG:TIM {tsType}') ts_read = self._getElogTimestamp() - return send_ok + return ts_read == tsType + + def _getElogCalculations(self): + """Get external logging configured calculations. + + Returns: + External logging calculations string obtained from ':ELOG:CALC?' + """ + ret = self._askRaw(':ELOG:CALC?') + if isinstance(ret, bytes): + ret = ret.decode().strip() + self.elogCalculations = [mode.strip() for mode in ret.split(',')] + return self.elogCalculations + return None + + def setElogCalculations(self, calculations: Union[str,List[str]]='AVG'): + """ + Sets a list of requested statistical calculations for all channels + e.g. setElogCalculations(["AVG", "RMS"]) + + calculations : list of strings or single string + possible values are 'AVG', 'MIN', 'MAX' and 'RMS' + """ + if isinstance(calculations, str): + calculations = [calculations] + for mode in calculations: + if mode not in ['AVG', 'MIN', 'MAX', 'RMS']: + raise ValueError("Possible ELOG calculation types are: AVG, MIN, MAX and RMS") + calc_list = ", ".join(calculations) + self._sendRaw(f':ELOG:CALC {calc_list}') + return self._getElogCalculations() == calculations def fetchElog(self, + max_records: Optional[int] = None, raw_string: bool = True ) -> Union[ List[List[str]], List[Union[dt.datetime, float]], bool ]: - data = self._askRaw(':ELOG:FETCH?') + """ + Fetches max_records records or less from the internal ELOG buffer. If no parameter is given, + all available records are returned. fetchElog() is only possible after startElog(). + """ + if max_records: + data = self._askRaw(f':ELOG:FETCH? {max_records:d}') + else: + data = self._askRaw(':ELOG:FETCH?') if not isinstance(data, bytes): return False @@ -671,11 +714,11 @@ def fetchElog(self, data = data.split(' ')[1] data = data.split(',') num_ch = len(self.elogChannelList) + num_values = num_ch * len(self.elogCalculations) if self.elogTimestamp in ('REL', 'ABS', 'ELOG'): - num_ch += 1 - #print(len(data)/(1.0*num_ch), data) - num_items = int(len(data)/num_ch) - data = [data[i*num_ch:i*num_ch+num_ch] for i in range(num_items)] + num_values += 1 + num_items = int(len(data)/num_values) + data = [data[i*num_values:i*num_values+num_values] for i in range(num_items)] if not raw_string: for i, row in enumerate(data): data[i] = self._convertElogArray(row) diff --git a/tests/test_connect.py b/tests/test_connect.py index e0b123c..2cda0cc 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -43,6 +43,10 @@ def handle_init_sequence(self, client): assert r == ':ELOG:TIM?\n' client.sendall(':ELOG:TIM ELOG\n'.encode()) + r = client.recv(1024).decode() + assert r == ':ELOG:CALC?\n' + client.sendall(':ELOG:CALC AVG\n'.encode()) + def handle_client(self, client): self.handle_init_sequence(client) self.handle_idn(client)