Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

432 create sample stage device for p99 #458

Merged
merged 11 commits into from
Aug 14, 2024
62 changes: 62 additions & 0 deletions src/dodal/beamlines/p99.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dodal.beamlines.beamline_utils import device_instantiation
from dodal.beamlines.beamline_utils import set_beamline as set_utils_beamline
from dodal.devices.p99.sample_stage import FilterMotor, SampleAngleStage
from dodal.devices.stages import ThreeAxisStage
from dodal.log import set_beamline as set_log_beamline
from dodal.utils import get_beamline_name

BL = get_beamline_name("BL99P")
set_log_beamline(BL)
set_utils_beamline(BL)


def sample_angle_stage(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> SampleAngleStage:
"""Sample stage for p99"""

return device_instantiation(
SampleAngleStage,
prefix="-MO-STAGE-01:",
name="sample_angle_stage",
wait=wait_for_connection,
fake=fake_with_ophyd_sim,
)


def sample_stage_filer(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> FilterMotor:
"""Sample stage for p99"""

return device_instantiation(
FilterMotor,
prefix="-MO-STAGE-02:MP:SELECT",
name="sample_stage_filer",
wait=wait_for_connection,
fake=fake_with_ophyd_sim,
)


def sample_xyz_stage(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> ThreeAxisStage:
return device_instantiation(
FilterMotor,
prefix="-MO-STAGE-02:",
name="sample_xyz_stage",
wait=wait_for_connection,
fake=fake_with_ophyd_sim,
)


def sample_lab_xyz_stage(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> ThreeAxisStage:
return device_instantiation(
FilterMotor,
prefix="-MO-STAGE-02:LAB:",
Relm-Arrowny marked this conversation as resolved.
Show resolved Hide resolved
name="sample_lab_xyz_stage",
wait=wait_for_connection,
fake=fake_with_ophyd_sim,
)
Empty file.
83 changes: 83 additions & 0 deletions src/dodal/devices/epics/setReadOnlyMotor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import asyncio
import time
from collections.abc import Callable

from bluesky.protocols import Movable
from ophyd_async.core import AsyncStatus, StandardReadable
from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw


class SetReadOnlyMotor(StandardReadable, Movable):
"""Device that moves a motor with only setpoint and readback.
Parameters
----------
prefix:
EPICS PV (None common part up to and including :).
name:
Name for the motor.
suffix:
The last part of any EPICS PV, default is the [".VAL", ".RBV"].
Notes
-----
Example usage::
stage = NoConfigMotor("prefix", "name", "suffix")
"""

def __init__(self, prefix: str, name="", suffix: list[str] | None = None) -> None:
# Define some signals
if suffix is None:
suffix = [".VAL", ".RBV", ".EGU"]
self.user_setpoint = epics_signal_rw(float, prefix + suffix[0])
self.user_readback = epics_signal_r(float, prefix + suffix[1])
self.motor_egu = epics_signal_r(str, prefix + suffix[2])
# Whether set() should complete successfully or not
self._set_success = True
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.user_readback],
config=[self.motor_egu],
)
super().__init__(name=name)

def set_name(self, name: str):
super().set_name(name)
# Readback should be named the same as its parent in read()
self.user_readback.set_name(name)

async def _move(self, new_position: float, watchers: list[Callable] | None = None):
if watchers is None:
watchers = []
start = time.monotonic()
old_position, units = await asyncio.gather(
self.user_setpoint.get_value(), self.motor_egu.get_value()
)

def update_watchers(current_position: float):
for watcher in watchers:
watcher(
name=self.name,
current=current_position,
initial=old_position,
target=new_position,
unit=units,
time_elapsed=time.monotonic() - start,
)

self.user_readback.subscribe_value(update_watchers)
try:
await self.user_setpoint.set(new_position)
finally:
self.user_readback.clear_sub(update_watchers)

def move(self, new_position: float, timeout: float | None = None):
"""Commandline only synchronous move of a Motor"""
from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop

if in_bluesky_event_loop():
raise RuntimeError("Will deadlock run engine if run in a plan")
call_in_bluesky_event_loop(self._move(new_position), timeout) # type: ignore

def set(self, value: float, timeout: float | None = None) -> AsyncStatus:
watchers: list[Callable] = []
coro = asyncio.wait_for(self._move(value, watchers), timeout=timeout)
return AsyncStatus(coro, watchers)
Empty file.
45 changes: 45 additions & 0 deletions src/dodal/devices/p99/sample_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from enum import Enum

from ophyd_async.core import Device
from ophyd_async.epics.signal import epics_signal_rw

from dodal.devices.epics.setReadOnlyMotor import SetReadOnlyMotor


class SampleAngleStage(Device):
def __init__(self, prefix: str, name: str):
self.theta = SetReadOnlyMotor(
prefix, name, suffix=["WRITETHETA", "WRITETHETA:RBV", "WRITETHETA.EGU"]
)
self.roll = SetReadOnlyMotor(
prefix, name, suffix=["WRITETHETA", "WRITETHETA:RBV", "WRITETHETA.EGU"]
)
self.pitch = SetReadOnlyMotor(
prefix, name, suffix=["WRITETHETA", "WRITETHETA:RBV", "WRITETHETA.EGU"]
)
Relm-Arrowny marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(name=name)


class p99StageSelections(str, Enum):
Empty = "Empty"
Mn5um = "Mn 5um"
Fe = "Fe (empty)"
Co5um = "Co 5um"
Ni5um = "Ni 5um"
Cu5um = "Cu 5um"
Zn5um = "Zn 5um"
Zr = "Zr (empty)"
Mo = "Mo (empty)"
Rh = "Rh (empty)"
Pd = "Pd (empty)"
Ag = "Ag (empty)"
Cd25um = "Cd 25um"
W = "W (empty)"
Pt = "Pt (empty)"
User = "User"


class FilterMotor(Device):
def __init__(self, prefix: str, name: str):
self.user_setpoint = epics_signal_rw(p99StageSelections, prefix)
super().__init__(name=name)
36 changes: 36 additions & 0 deletions src/dodal/devices/stages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ophyd_async.core import Device
from ophyd_async.epics.motion.motor import Motor


class ThreeAxisStage(Device):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should: Can we combine this with XYZPositioner

"""

Standard ophyd_async xyz motor stage, by combining 3 Motors.

Parameters
----------
prefix:
EPICS PV (None common part up to and including :).
name:
name for the stage.
infix:
EPICS PV, default is the ["X", "Y", "Z"].
Notes
-----
Example usage::
async with DeviceCollector():
xyz_stage = ThreeAxisStage("BLXX-MO-STAGE-XX:")
Or::
with DeviceCollector():
xyz_stage = ThreeAxisStage("BLXX-MO-STAGE-XX:", suffix = [".any",
".there", ".motorPv"])

"""

def __init__(self, prefix: str, name: str, infix: list[str] | None = None):
if infix is None:
infix = ["X", "Y", "Z"]
self.x = Motor(prefix + infix[0])
self.y = Motor(prefix + infix[1])
self.z = Motor(prefix + infix[2])
super().__init__(name=name)
43 changes: 43 additions & 0 deletions tests/devices/unit_tests/p99/test_p99_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest
from ophyd_async.core import DeviceCollector
from ophyd_async.core.signal import set_sim_value

from dodal.devices.p99.sample_stage import (
FilterMotor,
SampleAngleStage,
p99StageSelections,
)

# Long enough for multiple asyncio event loop cycles to run so
# all the tasks have a chance to run
A_BIT = 0.001


@pytest.fixture
async def sim_sampleAngleStage():
async with DeviceCollector(sim=True):
sim_sampleAngleStage = SampleAngleStage(
"p99-MO-TABLE-01:", name="sim_sampleAngleStage"
)
# Signals connected here
yield sim_sampleAngleStage


@pytest.fixture
async def sim_filter_wheel():
async with DeviceCollector(sim=True):
sim_filter_wheel = FilterMotor("p99-MO-TABLE-01:", name="sim_filter_wheel")
yield sim_filter_wheel


async def test_sampleAngleStage(sim_sampleAngleStage: SampleAngleStage) -> None:
assert sim_sampleAngleStage.name == "sim_sampleAngleStage"
assert sim_sampleAngleStage.theta.name == "sim_sampleAngleStage-theta"
assert sim_sampleAngleStage.roll.name == "sim_sampleAngleStage-roll"
assert sim_sampleAngleStage.pitch.name == "sim_sampleAngleStage-pitch"


async def test_filter_wheel(sim_filter_wheel: FilterMotor) -> None:
assert sim_filter_wheel.name == "sim_filter_wheel"
set_sim_value(sim_filter_wheel.user_setpoint, p99StageSelections.Cd25um)
assert await sim_filter_wheel.user_setpoint.get_value() == p99StageSelections.Cd25um
72 changes: 72 additions & 0 deletions tests/devices/unit_tests/test_setReadOnlyMotor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
from unittest.mock import ANY, Mock, call

import pytest
from ophyd_async.core import DeviceCollector
from ophyd_async.core.signal import set_sim_put_proceeds, set_sim_value

from dodal.devices.epics.setReadOnlyMotor import SetReadOnlyMotor

# Long enough for multiple asyncio event loop cycles to run so
# all the tasks have a chance to run
A_BIT = 0.001


@pytest.fixture
async def sim_sr_motor():
async with DeviceCollector(sim=True):
sim_sr_motor = SetReadOnlyMotor("BLxx-MO-xx-01:")
# Signals connected here

yield sim_sr_motor


async def test_setReadOnlyMotor_moves(sim_sr_motor: SetReadOnlyMotor) -> None:
assert sim_sr_motor.name == "sim_sr_motor"
set_sim_value(sim_sr_motor.motor_egu, "mm")

set_sim_put_proceeds(sim_sr_motor.user_setpoint, False)
s = sim_sr_motor.set(0.55)
watcher = Mock()
s.watch(watcher)
done = Mock()
s.add_callback(done)
await asyncio.sleep(A_BIT)
assert watcher.call_count == 1
assert watcher.call_args == call(
name="sim_sr_motor",
current=0.0,
initial=0.0,
target=0.55,
unit="mm",
time_elapsed=ANY,
)
watcher.reset_mock()
assert 0.55 == await sim_sr_motor.user_setpoint.get_value()
assert not s.done
await asyncio.sleep(0.1)
set_sim_value(sim_sr_motor.user_readback, 0.1)
assert watcher.call_count == 1
assert watcher.call_args == call(
name="sim_sr_motor",
current=0.1,
initial=0.0,
target=0.55,
unit="mm",
time_elapsed=ANY,
)
sim_sr_motor._set_success = False # make it fail
set_sim_put_proceeds(sim_sr_motor.user_setpoint, True)
await asyncio.sleep(A_BIT)
assert s.done
done.assert_called_once_with(s)


def test_motor_in_re(sim_sr_motor: SetReadOnlyMotor, RE) -> None:
sim_sr_motor.move(0)

def my_plan():
yield sim_sr_motor.move(1)

with pytest.raises(RuntimeError, match="Will deadlock run engine if run in a plan"):
RE(my_plan())
20 changes: 20 additions & 0 deletions tests/devices/unit_tests/test_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest
from ophyd_async.core import DeviceCollector

from dodal.devices.stages import ThreeAxisStage


@pytest.fixture
async def sim_three_axis_motor():
async with DeviceCollector(sim=True):
sim_three_axis_motor = ThreeAxisStage("BLxx-MO-xx-01:", "sim_three_axis_motor")
# Signals connected here

yield sim_three_axis_motor


async def test_there_axis_motor(sim_three_axis_motor: ThreeAxisStage) -> None:
assert sim_three_axis_motor.name == "sim_three_axis_motor"
assert sim_three_axis_motor.x.name == "sim_three_axis_motor-x"
assert sim_three_axis_motor.y.name == "sim_three_axis_motor-y"
assert sim_three_axis_motor.z.name == "sim_three_axis_motor-z"
Loading