Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Pimte area detector devices #429

Closed
wants to merge 11 commits into from
Empty file.
Empty file.
35 changes: 35 additions & 0 deletions src/dodal/devices/areadetector/epics/drivers/pimte1_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from enum import Enum

from ophyd_async.epics.areadetector.drivers.ad_base import ADBase
from ophyd_async.epics.areadetector.utils import ad_r, ad_rw
from ophyd_async.epics.signal import epics_signal_rw

"""
Driver for pi-mite 3 CCD

"""


class Pimte1Driver(ADBase):
def __init__(self, prefix: str) -> None:
self.trigger_mode = ad_rw(TriggerMode, prefix + "TriggerMode")
self.initialize = ad_rw(int, prefix + "Initialize")
self.set_temperture = epics_signal_rw(float, prefix + "SetTemperature")
self.read_backtemperture = ad_r(float, prefix + "MeasuredTemperature")
self.speed = ad_rw(SpeedMode, prefix + "SpeedSelection")
super().__init__(prefix)


class SpeedMode(str, Enum):
adc_50Khz = "0: 50 KHz - 20000 ns"
adc_100Khz = "1: 100 kHz - 10000 ns"
adc_200Khz = "2: 200 kHz - 5000 ns"
adc_500Khz = "3: 500 kHz - 2000 ns"
adc_1Mhz = "4: 1 MHz - 1000 ns"
adc_2Mhz = "5: 2 MHz - 500 ns"


class TriggerMode(str, Enum):
internal = "Free Run"
ext_trigger = "Ext Trigger"
bulb_mode = "Bulb Mode"
69 changes: 69 additions & 0 deletions src/dodal/devices/areadetector/epics/pimte_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
from typing import Optional, Set # , Set

from ophyd_async.core import AsyncStatus, DetectorControl, DetectorTrigger
from ophyd_async.epics.areadetector.drivers.ad_base import (
DEFAULT_GOOD_STATES,
DetectorState,
start_acquiring_driver_and_ensure_status,
)
from ophyd_async.epics.areadetector.utils import ImageMode, stop_busy_record

from dodal.devices.areadetector.epics.drivers.pimte1_driver import (
Pimte1Driver,
SpeedMode,
TriggerMode,
)

TRIGGER_MODE = {
DetectorTrigger.internal: TriggerMode.internal,
DetectorTrigger.constant_gate: TriggerMode.ext_trigger,
DetectorTrigger.variable_gate: TriggerMode.ext_trigger,
}


class PimteController(DetectorControl):
def __init__(
self,
driver: Pimte1Driver,
good_states: Set[DetectorState] = set(DEFAULT_GOOD_STATES),
) -> None:
self.driver = driver
self.good_states = good_states

def get_deadtime(self, exposure: float) -> float:
return 2.4e-5

async def _process_setting(self) -> None:
await self.driver.initialize.set(1)

async def set_temperature(self, temperature: float) -> None:
await self.driver.set_temperture.set(temperature)
await self._process_setting()

async def set_speed(self, speed: SpeedMode) -> None:
await self.driver.speed.set(speed)
await self._process_setting()

async def arm(
self,
num: int = 1,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
funcs = [
self.driver.num_images.set(999_999 if num == 0 else num),
self.driver.image_mode.set(ImageMode.multiple),
self.driver.trigger_mode.set(TRIGGER_MODE[trigger]),
]
if exposure is not None:
funcs.append(self.driver.acquire_time.set(exposure))

await asyncio.gather(*funcs)
await self._process_setting()
return await start_acquiring_driver_and_ensure_status(
self.driver, good_states=self.good_states
)

async def disarm(self):
await stop_busy_record(self.driver.acquire, False, timeout=1)
42 changes: 42 additions & 0 deletions src/dodal/devices/areadetector/pimteAD.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Sequence

from bluesky.protocols import Hints
from ophyd_async.core import DirectoryProvider, SignalR, StandardDetector
from ophyd_async.epics.areadetector.drivers import ADBaseShapeProvider
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF

from dodal.devices.areadetector.epics.drivers.pimte1_driver import Pimte1Driver
from dodal.devices.areadetector.epics.pimte_controller import PimteController


class HDFStatsPimte(StandardDetector):
_controller: PimteController
_writer: HDFWriter

def __init__(
self,
prefix: str,
directory_provider: DirectoryProvider,
name: str,
config_sigs: Sequence[SignalR] = (),
**scalar_sigs: str,
):
self.drv = Pimte1Driver(prefix + "CAM:")
self.hdf = NDFileHDF(prefix + "HDF5:")

super().__init__(
PimteController(self.drv),
HDFWriter(
self.hdf,
directory_provider,
lambda: self.name,
ADBaseShapeProvider(self.drv),
**scalar_sigs,
),
config_sigs=config_sigs,
name=name,
)

@property
def hints(self) -> Hints:
return self._writer.hints
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pytest
from bluesky.run_engine import RunEngine
from ophyd.sim import make_fake_device
from ophyd_async.core import StaticDirectoryProvider

from dodal.beamlines import beamline_utils, i03
from dodal.devices.focusing_mirror import VFMMirrorVoltages
Expand All @@ -37,6 +38,11 @@ def mock_beamline_module_filepaths(bl_name, bl_module):
[bl_module.__setattr__(attr[0], attr[1]) for attr in mock_attributes]


@pytest.fixture
def tmp_directory_provider(tmp_path: Path) -> StaticDirectoryProvider:
return StaticDirectoryProvider(tmp_path)


@pytest.fixture(scope="function")
def module_and_devices_for_beamline(request):
beamline = request.param
Expand Down
68 changes: 68 additions & 0 deletions tests/devices/unit_tests/test_pimte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import bluesky.plan_stubs as bps
import pytest
from bluesky.run_engine import RunEngine
from ophyd_async.core import DeviceCollector, StaticDirectoryProvider, set_sim_value

from dodal.devices.areadetector.pimteAD import HDFStatsPimte


def count_sim(det: HDFStatsPimte, times: int = 1):
"""Test plan to do the equivalent of bp.count for a sim detector."""

yield from bps.stage_all(det)
yield from bps.open_run()
yield from bps.declare_stream(det, name="primary", collect=False)
for _ in range(times):
read_value = yield from bps.rd(det._writer.hdf.num_captured)
yield from bps.trigger(det, wait=False, group="wait_for_trigger")

yield from bps.sleep(0.001)
set_sim_value(det._writer.hdf.num_captured, read_value + 1)

yield from bps.wait(group="wait_for_trigger")
yield from bps.create()
yield from bps.read(det)
yield from bps.save()

yield from bps.close_run()
yield from bps.unstage_all(det)
Comment on lines +9 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should: be somewhere common, since testing our StandardDetector impls create the right documents is going to be a repeated pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what to do, I stolen it from #332 may be I should move it to somewhere common on this PR? or wait until the #332 merge and use that instead?



@pytest.fixture
async def single_detector(
RE: RunEngine, tmp_directory_provider: StaticDirectoryProvider
) -> HDFStatsPimte:
tempD = tmp_directory_provider
async with DeviceCollector(sim=True):
detector = HDFStatsPimte("prefix", tempD, "pimte")

set_sim_value(detector._controller.driver.array_size_x, 10)
set_sim_value(detector._controller.driver.array_size_y, 20)
set_sim_value(detector.hdf.file_path_exists, True)
set_sim_value(detector.hdf.full_file_name, str(tempD().root.absolute()))
set_sim_value(detector._writer.hdf.num_captured, 0)
return detector


async def test_pimte(RE: RunEngine, single_detector: HDFStatsPimte):
names = []
docs = []
RE.subscribe(lambda name, _: names.append(name))
RE.subscribe(lambda _, doc: docs.append(doc))

RE(count_sim(single_detector))
writer = single_detector._writer

assert (
await writer.hdf.file_path.get_value()
== writer._directory_provider().root.as_posix()
)

assert names == [
"start",
"descriptor",
"stream_resource",
"stream_datum",
"event",
"stop",
]
20 changes: 20 additions & 0 deletions tests/devices/unit_tests/test_pimte1Driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest
from ophyd_async.core import DeviceCollector

from dodal.devices.areadetector.epics.drivers.pimte1_driver import Pimte1Driver

# Long enough for multiple asyncio event loop cycles to run so
# all the tasks have a chance to run
A_BIT = 0.001


@pytest.fixture
async def sim_pimte_driver():
async with DeviceCollector(sim=True):
sim_pimte_driver = Pimte1Driver("BLxxI-A-DET-03:CAM")
# Signals connected here
yield sim_pimte_driver


async def test_sim_pimte_driver(sim_pimte_driver: Pimte1Driver) -> None:
assert sim_pimte_driver.name == "sim_pimte_driver"
59 changes: 59 additions & 0 deletions tests/devices/unit_tests/test_pimteController.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from unittest.mock import patch

import pytest
from ophyd_async.core import DetectorTrigger, DeviceCollector
from ophyd_async.epics.areadetector.controllers import (
ADSimController,
)
from ophyd_async.epics.areadetector.drivers import ADBase
from ophyd_async.epics.areadetector.utils import ImageMode

from dodal.devices.areadetector.epics.drivers.pimte1_driver import (
Pimte1Driver,
SpeedMode,
TriggerMode,
)
from dodal.devices.areadetector.epics.pimte_controller import PimteController


@pytest.fixture
async def pimte(RE) -> PimteController:
async with DeviceCollector(sim=True):
drv = Pimte1Driver("DRIVER:")
controller = PimteController(drv)

return controller


@pytest.fixture
async def ad(RE) -> ADSimController:
async with DeviceCollector(sim=True):
drv = ADBase("DRIVER:")
controller = ADSimController(drv)

return controller


async def test_pimte_controller(RE, pimte: PimteController):
with patch("ophyd_async.core.signal.wait_for_value", return_value=None):
await pimte.arm(num=1, exposure=0.002, trigger=DetectorTrigger.internal)

driver = pimte.driver

assert await driver.num_images.get_value() == 1
assert await driver.image_mode.get_value() == ImageMode.multiple
assert await driver.trigger_mode.get_value() == TriggerMode.internal
assert await driver.acquire.get_value() is True
assert await driver.acquire_time.get_value() == 0.002
assert pimte.get_deadtime(2) == 2.4e-5

with patch(
"ophyd_async.epics.areadetector.utils.wait_for_value", return_value=None
):
await pimte.disarm()
await pimte.set_temperature(20)
await pimte.set_speed(SpeedMode.adc_200Khz)
assert await driver.set_temperture.get_value() == 20
assert await driver.speed.get_value() == SpeedMode.adc_200Khz

assert await driver.acquire.get_value() is False
Loading