-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #17 from Relm-Arrowny/issue5-create-ophy-device-fo…
…r-andor2-detctor Issue5 create ophy device for andor2 detctor
- Loading branch information
Showing
10 changed files
with
1,053 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from collections.abc import Sequence | ||
from pathlib import Path | ||
|
||
from bluesky.protocols import Hints | ||
from ophyd_async.core import DirectoryProvider, SignalR, StandardDetector | ||
from ophyd_async.core._providers import DirectoryInfo | ||
from ophyd_async.epics.areadetector.drivers import ADBaseShapeProvider | ||
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF | ||
|
||
from p99Bluesky.devices.epics.andor2_controller import Andor2Controller | ||
from p99Bluesky.devices.epics.drivers.andor2_driver import Andor2Driver | ||
|
||
|
||
class StaticDirectoryProviderPlus: | ||
def __init__( | ||
self, | ||
directory_path: Path, | ||
filename_prefix: str = "", | ||
resource_dir: Path | None = None, | ||
): | ||
self.counter = 0 | ||
if resource_dir is None: | ||
resource_dir = Path(".") | ||
self._directory_info = DirectoryInfo( | ||
root=directory_path, | ||
resource_dir=resource_dir, | ||
prefix=filename_prefix, | ||
suffix="", | ||
) | ||
|
||
def __call__(self) -> DirectoryInfo: | ||
self._directory_info.suffix = f"{self.counter}" | ||
self.counter += 1 | ||
return self._directory_info | ||
|
||
|
||
class Andor2Ad(StandardDetector): | ||
_controller: Andor2Controller | ||
_writer: HDFWriter | ||
|
||
def __init__( | ||
self, | ||
prefix: str, | ||
directory_provider: DirectoryProvider, | ||
name: str, | ||
config_sigs: Sequence[SignalR] = (), | ||
**scalar_sigs: str, | ||
): | ||
self.drv = Andor2Driver(prefix + "CAM:") | ||
self.hdf = NDFileHDF(prefix + "HDF5:") | ||
self.counter = 0 | ||
|
||
super().__init__( | ||
Andor2Controller(self.drv), | ||
HDFWriter( | ||
self.hdf, | ||
directory_provider, | ||
lambda: self.name, | ||
ADBaseShapeProvider(self.drv), | ||
sum="StatsTotal", | ||
**scalar_sigs, | ||
), | ||
config_sigs=config_sigs, | ||
name=name, | ||
) | ||
|
||
@property | ||
def hints(self) -> Hints: | ||
return self._writer.hints |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import asyncio | ||
|
||
from ophyd_async.core import AsyncStatus, DetectorControl, DetectorTrigger | ||
from ophyd_async.epics.areadetector.drivers.ad_base import ( | ||
DEFAULT_GOOD_STATES, | ||
DetectorState, | ||
start_acquiring_driver_and_ensure_status, | ||
) | ||
from ophyd_async.epics.areadetector.utils import ImageMode, stop_busy_record | ||
|
||
from p99Bluesky.devices.epics.drivers.andor2_driver import Andor2Driver, TriggerMode | ||
|
||
TRIGGER_MODE = { | ||
DetectorTrigger.internal: TriggerMode.internal, | ||
DetectorTrigger.constant_gate: TriggerMode.ext_trigger, | ||
DetectorTrigger.variable_gate: TriggerMode.ext_trigger, | ||
} | ||
|
||
|
||
class Andor2Controller(DetectorControl): | ||
def __init__( | ||
self, | ||
driver: Andor2Driver, | ||
good_states: set[DetectorState] | None = None, | ||
) -> None: | ||
if good_states is None: | ||
good_states = set(DEFAULT_GOOD_STATES) | ||
self.driver = driver | ||
self.good_states = good_states | ||
|
||
def get_deadtime(self, exposure: float) -> float: | ||
# dt = await asyncio.gather(self.driver.accumulate_period.get_value()) | ||
return exposure + 0.2 | ||
|
||
async def arm( | ||
self, | ||
num: int = 1, | ||
trigger: DetectorTrigger = DetectorTrigger.internal, | ||
exposure: float | None = None, | ||
) -> AsyncStatus: | ||
funcs = [ | ||
self.driver.num_images.set(999_999 if num == 0 else num), | ||
self.driver.image_mode.set(ImageMode.multiple), | ||
self.driver.trigger_mode.set(TRIGGER_MODE[trigger]), | ||
] | ||
if exposure is not None: | ||
funcs.append(self.driver.acquire_time.set(exposure)) | ||
|
||
await asyncio.gather(*funcs) | ||
return await start_acquiring_driver_and_ensure_status( | ||
self.driver, good_states=self.good_states | ||
) | ||
|
||
async def disarm(self): | ||
await stop_busy_record(self.driver.acquire, False, timeout=1) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from enum import Enum | ||
|
||
from ophyd_async.epics.areadetector.drivers.ad_base import ADBase | ||
from ophyd_async.epics.areadetector.utils import ad_r, ad_rw | ||
|
||
|
||
class TriggerMode(str, Enum): | ||
internal = "Internal" | ||
ext_trigger = "External" | ||
ext_start = "External Start" | ||
ext_exposure = "External Exposure" | ||
ext_FVP = "External FVP" | ||
soft = "Software" | ||
|
||
|
||
class Andor2Driver(ADBase): | ||
def __init__(self, prefix: str) -> None: | ||
self.trigger_mode = ad_rw(TriggerMode, prefix + "TriggerMode") | ||
self.accumulate_period = ad_r(float, prefix + "AndorAccumulatePeriod") | ||
super().__init__(prefix) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from bluesky import plan_stubs as bps | ||
from bluesky import preprocessors as bpp | ||
from bluesky.utils import Msg, short_uid | ||
from ophyd_async.core import DetectorTrigger, TriggerInfo | ||
|
||
from p99Bluesky.devices.andor2Ad import Andor2Ad | ||
|
||
""" | ||
AdPlan store the state of an area detector and its associated functions. | ||
""" | ||
|
||
|
||
class AdPlan: | ||
def __init__(self, det: Andor2Ad) -> None: | ||
self.exposure: float = 0.002 | ||
self.trigger: DetectorTrigger = DetectorTrigger.internal | ||
self.n_img: int = 1 | ||
self.det: Andor2Ad = det | ||
self.deadtime: float = self.det.controller.get_deadtime(self.exposure) | ||
|
||
""" | ||
Bare min to take an image using prepare plan with full detector control | ||
""" | ||
|
||
def takeImg( | ||
self, | ||
exposure: float | None = None, | ||
n_img: int | None = None, | ||
det_trig: DetectorTrigger | None = None, | ||
): | ||
self._updateDetInfo(exposure, n_img, det_trig) | ||
grp = short_uid("prepare") | ||
|
||
@bpp.stage_decorator([self.det]) | ||
@bpp.run_decorator() | ||
def innerTakeImg(): | ||
yield from bps.declare_stream(self.det, name="primary") | ||
yield from bps.prepare( | ||
self.det, self._getTriggerInfo(), group=grp, wait=True | ||
) | ||
yield from bps.kickoff(self.det, group=grp, wait=True) | ||
|
||
yield from bps.wait(group=grp) | ||
yield from bps.complete(self.det, group=grp, wait=True) | ||
|
||
return (yield from innerTakeImg()) | ||
|
||
def _updateDetInfo( | ||
self, | ||
exposure: float | None = None, | ||
n_img: int | None = None, | ||
det_trig: DetectorTrigger | None = None, | ||
) -> None: | ||
if exposure is not None: | ||
self.exposure = exposure | ||
self.deadtime = self.det.controller.get_deadtime(self.exposure) | ||
if n_img is not None: | ||
self.n_img = n_img | ||
if det_trig is not None: | ||
self.det_trig = det_trig | ||
|
||
def _getTriggerInfo(self) -> TriggerInfo: | ||
return TriggerInfo(self.n_img, self.trigger, self.deadtime, self.exposure) | ||
|
||
""" | ||
Static function for trigger with changeable count time | ||
""" | ||
|
||
def tiggerImg(self, dets: Andor2Ad, value: int): | ||
yield Msg("set", dets.drv.acquire_time, value) | ||
|
||
@bpp.stage_decorator([dets]) | ||
@bpp.run_decorator() | ||
def innerTiggerImg(): | ||
return (yield from bps.trigger_and_read([dets])) | ||
|
||
return (yield from innerTiggerImg()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
from ophyd_async.core import ( | ||
DetectorTrigger, | ||
DeviceCollector, | ||
set_sim_value, | ||
) | ||
from ophyd_async.epics.areadetector.controllers import ( | ||
ADSimController, | ||
) | ||
from ophyd_async.epics.areadetector.drivers import ADBase | ||
from ophyd_async.epics.areadetector.utils import ImageMode | ||
|
||
from p99Bluesky.devices.epics.andor2_controller import Andor2Controller | ||
from p99Bluesky.devices.epics.drivers.andor2_driver import Andor2Driver, TriggerMode | ||
|
||
|
||
@pytest.fixture | ||
async def Andor(RE) -> Andor2Controller: | ||
async with DeviceCollector(sim=True): | ||
drv = Andor2Driver("DRIVER:") | ||
controller = Andor2Controller(drv) | ||
|
||
return controller | ||
|
||
|
||
@pytest.fixture | ||
async def ad(RE) -> ADSimController: | ||
async with DeviceCollector(sim=True): | ||
drv = ADBase("DRIVER:") | ||
controller = ADSimController(drv) | ||
|
||
return controller | ||
|
||
|
||
async def test_Andor_controller(RE, Andor: Andor2Controller): | ||
with patch("ophyd_async.core.signal.wait_for_value", return_value=None): | ||
await Andor.arm(num=1, exposure=0.002, trigger=DetectorTrigger.internal) | ||
|
||
driver = Andor.driver | ||
|
||
set_sim_value(driver.accumulate_period, 1) | ||
assert await driver.num_images.get_value() == 1 | ||
assert await driver.image_mode.get_value() == ImageMode.multiple | ||
assert await driver.trigger_mode.get_value() == TriggerMode.internal | ||
assert await driver.acquire.get_value() is True | ||
assert await driver.acquire_time.get_value() == 0.002 | ||
# assert await Andor.get_deadtime(2) == 1 | ||
|
||
with patch( | ||
"ophyd_async.epics.areadetector.utils.wait_for_value", return_value=None | ||
): | ||
await Andor.disarm() | ||
|
||
assert await driver.acquire.get_value() is False |
Oops, something went wrong.