-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Alicat BASIS devices #121
Open
avichalk
wants to merge
3
commits into
numat:master
Choose a base branch
from
avichalk:basis-support
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+361
−1
Open
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,347 @@ | ||
"""Python driver for Alicat BASIS mass flow devices, using serial communication. | ||
|
||
Distributed under the GNU General Public License v2 | ||
Copyright (C) 2023 NuMat Technologies | ||
""" | ||
from __future__ import annotations | ||
|
||
import asyncio, nest_asyncio | ||
nest_asyncio.apply() | ||
from typing import Any, ClassVar | ||
|
||
from .util import Client, SerialClient, TcpClient, _is_float | ||
import time | ||
|
||
|
||
class FlowMeter: | ||
"""Python driver for BASIS Flow Meters. | ||
|
||
[Reference](https://www.alicat.com/wp-content/documents/manuals/DOC-MANUAL-BASIS2.pdf). | ||
|
||
This communicates with the flow meter over a USB or RS-232/RS-485 | ||
connection using pyserial. | ||
""" | ||
|
||
# A dictionary that maps port names to a tuple of connection | ||
# objects and the refcounts | ||
open_ports: ClassVar[dict[int, tuple]] = {} | ||
gases: ClassVar[list] = ['Air', 'Ar', 'CO2', 'N2', 'O2', 'N2O', 'H2', 'He', 'CH4'] | ||
|
||
def __init__(self, address: str = '/dev/ttyUSB0', unit: str = 'A', baudrate: int = 38400, **kwargs: Any) -> None: | ||
"""Connect this driver with the appropriate USB / serial port. | ||
|
||
Args: | ||
address: The serial port or TCP address:port. Default '/dev/ttyUSB0'. | ||
unit: The Alicat-specified unit ID, A-Z. Default 'A'. | ||
baudrate: The baud rate of the device. Default 38400. | ||
""" | ||
self.hw: Client = SerialClient(address=address, baudrate=baudrate, **kwargs) | ||
|
||
self.unit = unit | ||
self.keys = ['temperature', 'flow', 'totalizer', 'setpoint', | ||
'valve drive', 'gas'] | ||
self.open = True | ||
self.firmware: str | None = None | ||
|
||
## check if device is basis. if not, raise error | ||
async def _is_basis() -> None: | ||
self.control_point = await self._check_basis() | ||
|
||
loop = asyncio.get_event_loop() | ||
tasks = asyncio.gather(_is_basis()) | ||
loop.run_until_complete(tasks) | ||
|
||
async def _check_basis(self) -> None: | ||
""" Checks if connected device is a BASIS device. If not, raises error""" | ||
check = await self._write_and_read(f'{self.unit} SN') | ||
if check.split(" ")[-1][0] != 'B': | ||
raise OSError('This is not a BASIS device. Please use the regular serial driver.') | ||
|
||
async def __aenter__(self, *args: Any) -> FlowMeter: | ||
"""Provide async enter to context manager.""" | ||
return self | ||
|
||
async def __aexit__(self, *args: Any) -> None: | ||
"""Provide async exit to context manager.""" | ||
await self.close() | ||
return | ||
|
||
@classmethod | ||
async def is_connected(cls, port: str, unit: str = 'A') -> bool: | ||
"""Return True if the specified port is connected to this device. | ||
|
||
This class can be used to automatically identify ports with connected | ||
Alicats. Iterate through all connected interfaces, and use this to | ||
test. Ports that come back True should be valid addresses. | ||
|
||
Note that this distinguishes between `FlowController` and `FlowMeter`. | ||
""" | ||
is_device = False | ||
try: | ||
device = cls(port, unit) | ||
|
||
try: | ||
c = await device.get() | ||
if cls.__name__ == 'FlowMeter': | ||
assert c | ||
assert 'setpoint' not in device.keys | ||
elif cls.__name__ == 'FlowController': | ||
assert c | ||
assert 'setpoint' in device.keys | ||
else: | ||
raise NotImplementedError('Must be meter or controller.') | ||
is_device = True | ||
finally: | ||
await device.close() | ||
except Exception: | ||
pass | ||
return is_device | ||
|
||
def _test_controller_open(self) -> None: | ||
"""Raise an IOError if the FlowMeter has been closed. | ||
|
||
Does nothing if the meter is open and good for read/write | ||
otherwise raises an IOError. This only checks if the meter | ||
has been closed by the FlowMeter.close method. | ||
""" | ||
pass | ||
if not self.open: | ||
raise OSError(f"The FlowMeter with unit ID {self.unit} and \ | ||
port {self.hw.address} is not open") | ||
|
||
async def _write_and_read(self, command: str) -> str | None: | ||
"""Wrap the communicator request, to call _test_controller_open() before any request.""" | ||
self._test_controller_open() | ||
return await self.hw._write_and_read(command) | ||
|
||
async def get(self) -> dict: | ||
"""Get the current state of the flow controller. | ||
|
||
From the Alicat mass flow controller documentation, this data is: | ||
* Pressure (normally in psia) | ||
* Temperature (normally in C) | ||
* Mass flow (in units specified at time of order) | ||
* Total flow | ||
* Currently selected gas | ||
|
||
Args: | ||
retries: Number of times to re-attempt reading. Default 2. | ||
Returns: | ||
The state of the flow controller, as a dictionary. | ||
|
||
""" | ||
command = f'{self.unit}' | ||
line = await self._write_and_read(command) | ||
if not line: | ||
raise OSError("Could not read values") | ||
spl = line.split() | ||
unit, values = spl[0], spl[1:] | ||
|
||
# Over range errors for mass, volume, pressure, and temperature | ||
# Explicitly silenced because I find it redundant. | ||
while values[-1].upper() in ['MOV', 'VOV', 'POV', 'TOV']: | ||
del values[-1] | ||
if unit != self.unit: | ||
raise ValueError("Flow controller unit ID mismatch.") | ||
if len(values) == 5 and len(self.keys) == 6: | ||
del self.keys[-3] | ||
return {k: (float(v) if _is_float(v) else v) | ||
for k, v in zip(self.keys, values)} | ||
|
||
async def set_gas(self, gas: str | int) -> None: | ||
"""Set the gas type. | ||
|
||
Args: | ||
gas: The gas type, as a string or integer. Supported strings are: | ||
'Air', 'Ar', 'CO2', 'N2', 'O2', 'N2O', 'H2', 'He', 'CH4' | ||
""" | ||
if isinstance(gas, str): | ||
if gas not in self.gases: | ||
raise ValueError(f"{gas} not supported!") | ||
gas_number = self.gases.index(gas) | ||
else: | ||
gas_number = gas | ||
command = f'{self.unit}GS {gas_number}' | ||
res = await self._write_and_read(command) | ||
|
||
if gas != res.split(" ")[-3]: | ||
raise OSError("Cannot set gas.") | ||
|
||
async def tare(self) -> None: | ||
"""Tare flow.""" | ||
command = f'{self.unit} V' | ||
line = await self._write_and_read(command) | ||
|
||
if line == '?': | ||
raise OSError("Unable to tare flow.") | ||
|
||
async def reset_totalizer(self) -> None: | ||
"""Reset the totalizer.""" | ||
command = f'{self.unit}T' | ||
await self._write_and_read(command) | ||
|
||
async def get_firmware(self) -> str: | ||
"""Get the device firmware version.""" | ||
if self.firmware is None: | ||
command = f'{self.unit}VE' | ||
self.firmware = await self._write_and_read(command) | ||
if not self.firmware: | ||
raise OSError("Unable to get firmware.") | ||
return self.firmware | ||
|
||
async def flush(self) -> None: | ||
"""Read all available information. Use to clear queue.""" | ||
self._test_controller_open() | ||
await self.hw._clear() | ||
|
||
async def close(self) -> None: | ||
"""Close the flow meter. Call this on program termination. | ||
|
||
Also closes the serial port if no other FlowMeter object has | ||
a reference to the port. | ||
""" | ||
if not self.open: | ||
return | ||
await self.hw.close() | ||
self.open = False | ||
|
||
|
||
class FlowController(FlowMeter): | ||
"""Python driver for Alicat Flow Controllers. | ||
|
||
[Reference](http://www.alicat.com/products/mass-flow-meters-and- | ||
controllers/mass-flow-controllers/). | ||
|
||
This communicates with the flow controller over a USB or RS-232/RS-485 | ||
connection using pyserial. | ||
|
||
To set up your Alicat flow controller, power on the device and make sure | ||
that the "Input" option is set to "Serial". | ||
""" | ||
|
||
def __init__(self, address: str='/dev/ttyUSB0', unit: str='A', baudrate: int = 38400, **kwargs: Any) -> None: | ||
"""Connect this driver with the appropriate USB / serial port. | ||
|
||
Args: | ||
address: The serial port. Default '/dev/ttyUSB0'. | ||
unit: The Alicat-specified unit ID, A-Z. Default 'A'. | ||
baudrate: The baud rate of the device. Default 38400. | ||
""" | ||
FlowMeter.__init__(self, address, unit, baudrate, **kwargs) | ||
|
||
async def __aenter__(self, *args: Any) -> FlowController: | ||
"""Provide async enter to context manager.""" | ||
return self | ||
|
||
async def _write_and_read(self, command: str) -> str | None: | ||
"""Wrap the communicator request. | ||
|
||
(1) Ensure _init_task is called once before the first request | ||
(2) Call _test_controller_open() before any request | ||
""" | ||
self._test_controller_open() | ||
return await self.hw._write_and_read(command) | ||
|
||
async def get(self) -> dict: | ||
"""Get the current state of the flow controller. | ||
|
||
From the Alicat mass flow controller documentation, this data is: | ||
* Pressure (normally in psia) | ||
* Temperature (normally in C) | ||
* Volumetric flow (in units specified at time of order) | ||
* Mass flow (in units specified at time of order) | ||
* Flow setpoint (in units of control point) | ||
* Flow control point (either 'flow' or 'pressure') | ||
* Total flow (only on models with the optional totalizer function) | ||
* Currently selected gas | ||
|
||
Returns: | ||
The state of the flow controller, as a dictionary. | ||
""" | ||
state = await super().get() | ||
if state is None: | ||
return None | ||
return state | ||
|
||
async def get_totalizer_batch(self) -> str: | ||
"""Get the totalizer batch volume. | ||
|
||
Returns: | ||
line: Remaining batch volume | ||
""" | ||
remaining = await self._write_and_read(f'{self.unit}DV 64') | ||
current = await self._write_and_read(f'{self.unit}TB') | ||
if current == '?' or remaining == '?': | ||
raise OSError("Unable to read totalizer batch volume.") | ||
return f'Totalizer currently set to {current}. Remaining volume {remaining.split(" ")[-1]}' | ||
|
||
async def set_totalizer_batch(self, batch_volume: float, batch: int = 1, units: str = 'default') -> None: | ||
"""Set the totalizer batch volume. | ||
|
||
Args: | ||
batch_volume: Target batch volume, in same units as units | ||
on device | ||
""" | ||
command = f'{self.unit}TB {batch_volume}' | ||
line = await self._write_and_read(command) | ||
|
||
if line == '?': | ||
raise OSError("Unable to set totalizer batch volume. Check if volume is out of range for device.") | ||
|
||
async def hold(self, percentage) -> None: | ||
"""Override command to issue a valve hold at a certain percentage of full drive. | ||
|
||
Args: | ||
percentage : float | ||
Percentage of full valve drive | ||
""" | ||
command = f'{self.unit}HPUR {percentage}' ## need the space otherwise won't work | ||
await self._write_and_read(command) | ||
|
||
async def cancel_hold(self) -> None: | ||
"""Cancel valve hold.""" | ||
command = f'{self.unit}C' | ||
await self._write_and_read(command) | ||
|
||
async def get_pid(self) -> dict: | ||
"""Read the current PID values on the controller. | ||
|
||
Values include the P value and I value. | ||
Values returned as a dictionary. | ||
""" | ||
self.pid_keys = ['P', 'I'] | ||
|
||
command = f'{self.unit}LCG' | ||
line = await self._write_and_read(command) | ||
if not line: | ||
raise OSError("Could not get PID values.") | ||
spl = line.split() | ||
return {k: v for k, v in zip(self.pid_keys, spl[1:])} | ||
|
||
async def set_pid(self, p: int | None=None, | ||
i: int | None=None,) -> None: | ||
"""Set specified PID parameters. | ||
|
||
Args: | ||
p: Proportional gain | ||
i: Integral gain. Only used in PD2I loop type. | ||
""" | ||
current = await self.get_pid() | ||
p = [p if p is not None else current['P']][0] | ||
i = [i if i is not None else current['I']][0] | ||
|
||
command = f'{self.unit}LCG {p} {i}' | ||
await self._write_and_read(command) | ||
|
||
async def set_setpoint(self, setpoint: float) -> None: | ||
"""Set the target setpoint.""" | ||
command = f'{self.unit}S {setpoint:.2f}' | ||
line = await self._write_and_read(command) | ||
if not line: | ||
raise OSError("Could not set setpoint.") | ||
try: | ||
current = float(line.split()[4]) | ||
except IndexError: | ||
current = None | ||
if current is not None and abs(current - setpoint) > 0.01: | ||
raise OSError("Could not set setpoint.") |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is necessary. It might make sense to put detection of Basis responses in the main driver and throw errors there.
Anyway, if it is necessary it can be done with
asyncio.ensure_future
to get around the constructor not being able to useawait
. I'm pretty sure that will finish before e.g. any reads are made. If not, it could then beawait
ed byis_connected
sinceawait
ing an already-completed task basically does nothing.Is this making sense, or should I give an example or different explanation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Alex, apologies for the late response. It has been a very busy week...
That's true (and I assume if anyone is importing the BASIS part of this library then they already have a BASIS device). I'll go ahead and mark that for deletion in the code. Let me know if there's anything else you think could be improved (I'm sure there is!) and I'll put everything into one pull request. Thank you again for being so patient!