Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a run_program to the pmac #661

Merged
merged 20 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions src/dodal/devices/i24/pmac.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from enum import Enum
from enum import Enum, IntEnum

from bluesky.protocols import Triggerable
from ophyd_async.core import AsyncStatus, StandardReadable
from ophyd_async.core.signal import SignalRW
from ophyd_async.core import AsyncStatus, StandardReadable, observe_value
from ophyd_async.core.signal import SignalR, SignalRW
from ophyd_async.core.signal_backend import SignalBackend
from ophyd_async.core.soft_signal_backend import SoftSignalBackend
from ophyd_async.core.utils import DEFAULT_TIMEOUT
Expand All @@ -13,6 +13,11 @@
ZERO_STR = "!x0y0z0" # Command to blend any ongoing move into new position


class ScanState(IntEnum):
RUNNING = 1
DONE = 0


class LaserSettings(str, Enum):
"""PMAC strings to switch laser on and off.
Note. On the PMAC, M-variables usually have to do with position compare
Expand Down Expand Up @@ -78,7 +83,7 @@ async def set(self, laser_setting: LaserSettings):


class PMACStringEncReset(SignalRW):
""""""
"""Set a pmac_string to control the encoder channels in the controller."""

def __init__(
self,
Expand All @@ -95,6 +100,34 @@ async def set(self, enc_string: EncReset):
await self.signal.set(enc_string.value, wait=True)


class ProgramRunner(SignalRW):
"""Trigger the collection by setting the program number on the PMAC string.

Once the program number has been set, wait for the collection to be complete.
This will only be true when the status becomes 0.
"""

def __init__(
self,
pmac_str_sig: SignalRW,
status_sig: SignalR,
backend: SignalBackend,
timeout: float | None = DEFAULT_TIMEOUT,
name: str = "",
) -> None:
self.signal = pmac_str_sig
self.status = status_sig
super().__init__(backend, timeout, name)

@AsyncStatus.wrap
async def set(self, prog_num: int):
prog_str = f"&2b{prog_num}r"
await self.signal.set(prog_str, wait=True)
async for reading in observe_value(self.status):
if reading == ScanState.DONE:
return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could: Can you not just use wait_for_value here?


class PMAC(StandardReadable):
"""Device to control the chip stage on I24."""

Expand All @@ -121,4 +154,8 @@ def __init__(self, prefix: str, name: str = "") -> None:
self.scanstatus = epics_signal_r(float, "BL24I-MO-STEP-14:signal:P2401")
self.counter = epics_signal_r(float, "BL24I-MO-STEP-14:signal:P2402")

self.run_program = ProgramRunner(
self.pmac_string, self.scanstatus, backend=SoftSignalBackend(str)
)

super().__init__(name)
9 changes: 9 additions & 0 deletions tests/devices/unit_tests/i24/test_pmac.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import bluesky.plan_stubs as bps
import pytest
from bluesky.run_engine import RunEngine
from ophyd_async.core import set_mock_value

from dodal.devices.i24.pmac import HOME_STR, PMAC, EncReset, LaserSettings

Expand Down Expand Up @@ -61,3 +62,11 @@ async def test_set_pmac_string_for_enc_reset(fake_pmac: PMAC, RE):
RE(bps.abs_set(fake_pmac.enc_reset, EncReset.ENC7, wait=True))

assert await fake_pmac.pmac_string.get_value() == "m708=100 m709=150"


async def test_run_proogram(fake_pmac: PMAC, RE):
set_mock_value(fake_pmac.scanstatus, 0)
prog_num = 10
RE(bps.abs_set(fake_pmac.run_program, prog_num, wait=True))

assert await fake_pmac.pmac_string.get_value() == f"&2b{prog_num}r"
Loading