Skip to content

Commit

Permalink
Merge pull request #4 from alexrudd2/ramp
Browse files Browse the repository at this point in the history
Add option to configure ramp rates
  • Loading branch information
alexrudd2 authored Jul 16, 2024
2 parents 71e9b23 + a30c83d commit 0520206
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 9 deletions.
60 changes: 56 additions & 4 deletions alicat/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async def set_pressure(self, pressure: float) -> None:
await self._set_setpoint(pressure)

async def get_totalizer_batch(self, batch: int = 1) -> str:
"""Get the totalizer batch volume.
"""Get the totalizer batch volume (firmware 10v00).
Args:
batch: Which of the two totalizer batches to query.
Expand All @@ -397,7 +397,7 @@ async def get_totalizer_batch(self, batch: int = 1) -> str:
return f'{values[2]} {values[4]}' # returns 'batch vol' 'units'

async def set_totalizer_batch(self, batch_volume: float, batch: int = 1, units: str = 'default') -> None:
"""Set the totalizer batch volume.
"""Set the totalizer batch volume (firmware 10v00).
Args:
batch: Which of the two totalizer batches to set.
Expand All @@ -423,7 +423,7 @@ async def set_totalizer_batch(self, batch_volume: float, batch: int = 1, units:
raise OSError("Unable to set totalizer batch volume. Check if volume is out of range for device.")

async def hold(self) -> None:
"""Override command to issue a valve hold.
"""Override command to issue a valve hold (firmware 5v07).
For a single valve controller, hold the valve at the present value.
For a dual valve flow controller, hold the valve at the present value.
Expand Down Expand Up @@ -510,7 +510,17 @@ async def _set_setpoint(self, setpoint: float) -> None:
except IndexError:
current = None
if current is not None and abs(current - setpoint) > 0.01:
raise OSError("Could not set setpoint.")
# possibly the setpoint is being ramped
command = f'{self.unit}LS'
line = await self._write_and_read(command)
if not line:
raise OSError("Could not set setpoint.")
try:
commanded = float(line.split()[2])
except IndexError:
raise OSError("Could not set setpoint.") from None
if commanded is not None and abs(commanded - setpoint) > 0.01:
raise OSError("Could not set setpoint.")

async def _get_control_point(self) -> str:
"""Get the control point, and save to internal variable."""
Expand Down Expand Up @@ -543,3 +553,45 @@ async def _set_control_point(self, point: str) -> None:
if value != reg:
raise OSError("Could not set control point.")
self.control_point = point

async def set_ramp_config(self, config: dict[str, bool]) -> None:
"""Configure the setpoint ramp settings (firmware 10v05).
`up`: whether the controller ramps when increasing the setpoint,
`down`: whether the controller ramps when decreasing the setpoint,
(this includes setpoints below 0 on bidirectional devices),
`zero`: whether the controller ramps when establishing a zero setpoint,
`power`: whether the controller ramps when using a power-up setpoint
"""
command = (f"{self.unit}LSRC"
f" {1 if config['up'] else 0}"
f" {1 if config['down'] else 0}"
f" {1 if config['zero'] else 0}"
f" {1 if config['power'] else 0}")
line = await self._write_and_read(command)
if not line or self.unit not in line:
raise OSError("Could not set ramp config.")


async def get_ramp_config(self) -> dict[str, bool]:
"""Get the setpoint ramp settings (firmware 10v05).
`up`: whether the controller ramps when increasing the setpoint,
`down`: whether the controller ramps when decreasing the setpoint,
(this includes setpoints below 0 on bidirectional devices),
`zero`: whether the controller ramps when establishing a zero setpoint,
`power`: whether the controller ramps when using a power-up setpoint
"""
command = f"{self.unit}LSRC"
line = await self._write_and_read(command)
if not line or self.unit not in line:
raise OSError("Could not read ramp config.")
values = line[2:].split(' ')
if len(values) != 4:
raise OSError("Could not read ramp config.")
return {
'up': bool(values[0]),
'down': bool(values[1]),
'zero': bool(values[2]),
'power': bool(values[3]),
}
18 changes: 14 additions & 4 deletions alicat/mock.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Mock for offline testing of `FlowController`s."""
from __future__ import annotations

from random import choice, random
from time import sleep
from typing import Any, Dict, Union
from typing import Any
from unittest.mock import MagicMock

from .driver import FlowController as RealFlowController
Expand All @@ -24,7 +25,7 @@ def __init__(self, address: str, unit: str = 'A', *args: Any, **kwargs: Any) ->
self.hw = AsyncClientMock()
self.open = True
self.control_point: str = choice(['flow', 'pressure'])
self.state: Dict[str, Union[str, float]] = {
self.state: dict[str, str | float] = {
'setpoint': 10,
'gas': 'N2',
'mass_flow': 10 * (0.95 + 0.1 * random()),
Expand All @@ -34,13 +35,14 @@ def __init__(self, address: str, unit: str = 'A', *args: Any, **kwargs: Any) ->
'unit': unit,
'volumetric_flow': 0.0,
}
self.ramp_config = { 'up': False, 'down': False, 'zero': False, 'power': False }
self.unit: str = unit
self.button_lock: bool = False
self.keys = ['pressure', 'temperature', 'volumetric_flow', 'mass_flow',
'setpoint', 'gas']
self.firmware = '6v21.0-R22 Nov 30 2016,16:04:20'

async def get(self) -> Dict[str, Union[str, float]]:
async def get(self) -> dict[str, str | float]:
"""Return the full state."""
sleep(random() * 0.25)
return self.state
Expand All @@ -62,7 +64,7 @@ async def set_flow_rate(self, flowrate: float) -> None:
await self._set_control_point('flow')
await self._set_setpoint(flowrate)

async def set_gas(self, gas: Union[int, str]) -> None:
async def set_gas(self, gas: int | str) -> None:
"""Set the gas type."""
if isinstance(gas, int):
gas = self.gases[gas]
Expand All @@ -80,3 +82,11 @@ async def lock(self) -> None:
async def unlock(self) -> None:
"""Unlock the buttons."""
self.button_lock = False

async def get_ramp_config(self) -> dict[str, bool]:
"""Get ramp config."""
return self.ramp_config

async def set_ramp_config(self, config: dict[str, bool]) -> None:
"""Set ramp config."""
self.ramp_config = config
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
description="Python driver for Alicat mass flow controllers.",
long_description=long_description,
long_description_content_type='text/markdown',
url="https://github.com/numat/alicat/",
url="https://github.com/alexrudd2/alicat/",
author="Patrick Fuller",
author_email="[email protected]",
maintainer="Alex Ruddick",
Expand Down
13 changes: 13 additions & 0 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,16 @@ async def test_get_firmware():
async with FlowController(ADDRESS) as device:
result = await device.get_firmware()
assert 'v' in result or 'GP' in result


@pytest.mark.parametrize('config', [
{'up': True, 'down': False, 'zero': True, 'power': False},
{'up': True, 'down': True, 'zero': False, 'power': True},
{'up': False, 'down': False, 'zero': False, 'power': False},
])
async def test_ramp_config(config):
"""Confirm changing the ramping configuration works."""
async with FlowController(ADDRESS) as device:
await device.set_ramp_config(config)
result = await device.get_ramp_config()
assert config == result

0 comments on commit 0520206

Please sign in to comment.