-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speculative visit directory service #283
Changes from all commits
25772e9
20f2ac0
ebff3fd
f0c287d
e0d9905
a56a4b9
c45b0ba
92fdbb5
7daaf01
d4176ae
44d9c13
7765855
dabed95
de1a2ad
1108ee6
54d882f
ece0e84
06daaf3
2cf7ff5
16593a3
a258e2c
c8edb91
44fe64a
5a830c0
c572c02
49dd587
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,4 +14,4 @@ | |
}, | ||
"esbonio.server.enabled": true, | ||
"esbonio.sphinx.confDir": "", | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import functools | ||
import logging | ||
from dataclasses import dataclass, field | ||
from importlib import import_module | ||
|
@@ -10,6 +11,7 @@ | |
Generic, | ||
List, | ||
Optional, | ||
Sequence, | ||
Tuple, | ||
Type, | ||
TypeVar, | ||
|
@@ -19,19 +21,24 @@ | |
get_type_hints, | ||
) | ||
|
||
from bluesky import RunEngine | ||
from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop | ||
from ophyd_async.core import Device as AsyncDevice | ||
from ophyd_async.core import wait_for_connection | ||
from pydantic import create_model | ||
from pydantic.fields import FieldInfo, ModelField | ||
|
||
from blueapi.config import EnvironmentConfig, SourceKind | ||
from blueapi.data_management.gda_directory_provider import VisitDirectoryProvider | ||
from blueapi.utils import BlueapiPlanModelConfig, load_module_all | ||
|
||
from .bluesky_types import ( | ||
BLUESKY_PROTOCOLS, | ||
Device, | ||
HasName, | ||
MsgGenerator, | ||
Plan, | ||
PlanGenerator, | ||
PlanWrapper, | ||
is_bluesky_compatible_device, | ||
is_bluesky_plan_generator, | ||
) | ||
|
@@ -51,12 +58,23 @@ class BlueskyContext: | |
run_engine: RunEngine = field( | ||
default_factory=lambda: RunEngine(context_managers=[]) | ||
) | ||
plan_wrappers: Sequence[PlanWrapper] = field(default_factory=list) | ||
plans: Dict[str, Plan] = field(default_factory=dict) | ||
devices: Dict[str, Device] = field(default_factory=dict) | ||
plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict) | ||
directory_provider: Optional[VisitDirectoryProvider] = field(default=None) | ||
sim: bool = field(default=False) | ||
|
||
_reference_cache: Dict[Type, Type] = field(default_factory=dict) | ||
|
||
def wrap(self, plan: MsgGenerator) -> MsgGenerator: | ||
wrapped_plan = functools.reduce( | ||
lambda wrapped, next_wrapper: next_wrapper(wrapped), | ||
self.plan_wrappers, | ||
plan, | ||
Comment on lines
+71
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be obvious to people coming from python, but making plan be a kwarg There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reduce explicitly disallows keyword arguments |
||
) | ||
yield from wrapped_plan | ||
|
||
def find_device(self, addr: Union[str, List[str]]) -> Optional[Device]: | ||
""" | ||
Find a device in this context, allows for recursive search. | ||
|
@@ -86,6 +104,18 @@ def with_config(self, config: EnvironmentConfig) -> None: | |
elif source.kind is SourceKind.DODAL: | ||
self.with_dodal_module(mod) | ||
|
||
call_in_bluesky_event_loop(self.connect_devices(self.sim)) | ||
|
||
async def connect_devices(self, sim: bool = False) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should make private and rename to |
||
coros = {} | ||
for device_name, device in self.devices.items(): | ||
if isinstance(device, AsyncDevice): | ||
device.set_name(device_name) | ||
coros[device_name] = device.connect(sim) | ||
|
||
if len(coros) > 0: | ||
await wait_for_connection(**coros) | ||
|
||
def with_plan_module(self, module: ModuleType) -> None: | ||
""" | ||
Register all functions in the module supplied as plans. | ||
|
@@ -113,10 +143,10 @@ def plan_2(...) -> MsgGenerator: | |
def with_device_module(self, module: ModuleType) -> None: | ||
self.with_dodal_module(module) | ||
|
||
def with_dodal_module(self, module: ModuleType, **kwargs) -> None: | ||
def with_dodal_module(self, module: ModuleType) -> None: | ||
from dodal.utils import make_all_devices | ||
|
||
for device in make_all_devices(module, **kwargs).values(): | ||
for device in make_all_devices(module).values(): | ||
self.device(device) | ||
|
||
def plan(self, plan: PlanGenerator) -> PlanGenerator: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import logging | ||
import tempfile | ||
from abc import ABC, abstractmethod | ||
from pathlib import Path | ||
from typing import Optional | ||
|
||
from aiohttp import ClientSession | ||
from ophyd_async.core import DirectoryInfo, DirectoryProvider, StaticDirectoryProvider | ||
from pydantic import BaseModel | ||
|
||
|
||
class DataCollectionIdentifier(BaseModel): | ||
collectionNumber: int | ||
|
||
|
||
class VisitServiceClientBase(ABC): | ||
""" | ||
Object responsible for I/O in determining collection number | ||
""" | ||
|
||
@abstractmethod | ||
async def create_new_collection(self) -> DataCollectionIdentifier: | ||
... | ||
|
||
@abstractmethod | ||
async def get_current_collection(self) -> DataCollectionIdentifier: | ||
... | ||
|
||
|
||
class VisitServiceClient(VisitServiceClientBase): | ||
_url: str | ||
|
||
def __init__(self, url: str) -> None: | ||
self._url = url | ||
|
||
async def create_new_collection(self) -> DataCollectionIdentifier: | ||
async with ClientSession() as session: | ||
async with session.post(f"{self._url}/numtracker") as response: | ||
if response.status == 200: | ||
json = await response.json() | ||
return DataCollectionIdentifier.parse_obj(json) | ||
else: | ||
raise Exception(response.status) | ||
|
||
async def get_current_collection(self) -> DataCollectionIdentifier: | ||
async with ClientSession() as session: | ||
async with session.get(f"{self._url}/numtracker") as response: | ||
if response.status == 200: | ||
json = await response.json() | ||
return DataCollectionIdentifier.parse_obj(json) | ||
else: | ||
raise Exception(response.status) | ||
|
||
|
||
class LocalVisitServiceClient(VisitServiceClientBase): | ||
_count: int | ||
|
||
def __init__(self) -> None: | ||
self._count = 0 | ||
|
||
async def create_new_collection(self) -> DataCollectionIdentifier: | ||
count = self._count | ||
self._count += 1 | ||
return DataCollectionIdentifier(collectionNumber=count) | ||
|
||
async def get_current_collection(self) -> DataCollectionIdentifier: | ||
return DataCollectionIdentifier(collectionNumber=self._count) | ||
|
||
|
||
class VisitDirectoryProvider(DirectoryProvider): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs tests |
||
""" | ||
Gets information from a remote service to construct the path that detectors | ||
should write to, and determine how their files should be named. | ||
""" | ||
|
||
_data_group_name: str | ||
_data_directory: Path | ||
|
||
_client: VisitServiceClientBase | ||
_current_collection: Optional[DirectoryInfo] | ||
_session: Optional[ClientSession] | ||
|
||
def __init__( | ||
self, | ||
data_group_name: str, | ||
data_directory: Path, | ||
client: VisitServiceClientBase, | ||
): | ||
self._data_group_name = data_group_name | ||
self._data_directory = data_directory | ||
self._client = client | ||
|
||
self._current_collection = None | ||
self._session = None | ||
|
||
async def update(self) -> None: | ||
""" | ||
Calls the visit service to create a new data collection in the current visit. | ||
""" | ||
# TODO: After visit service is more feature complete: | ||
# TODO: Allow selecting visit as part of the request to BlueAPI | ||
# TODO: Consume visit information from BlueAPI and pass down to this class | ||
# TODO: Query visit service to get information about visit and data collection | ||
# TODO: Use AuthN information as part of verification with visit service | ||
|
||
try: | ||
collection_id_info = await self._client.create_new_collection() | ||
self._current_collection = self._generate_directory_info(collection_id_info) | ||
except Exception as ex: | ||
# TODO: The catch all is needed because the RunEngine will not | ||
# currently handle it, see | ||
# https://github.com/bluesky/bluesky/pull/1623 | ||
self._current_collection = None | ||
logging.exception(ex) | ||
|
||
def _generate_directory_info( | ||
self, | ||
collection_id_info: DataCollectionIdentifier, | ||
) -> DirectoryInfo: | ||
collection_id = collection_id_info.collectionNumber | ||
file_prefix = f"{self._data_group_name}-{collection_id}" | ||
return DirectoryInfo(str(self._data_directory), file_prefix) | ||
|
||
def __call__(self) -> DirectoryInfo: | ||
if self._current_collection is not None: | ||
return self._current_collection | ||
else: | ||
raise ValueError( | ||
"No current collection, update() needs to be called at least once" | ||
) | ||
|
||
|
||
_SINGLETON: Optional[VisitDirectoryProvider] = None | ||
|
||
|
||
def set_directory_provider_singleton(provider: VisitDirectoryProvider): | ||
global _SINGLETON | ||
|
||
_SINGLETON = provider | ||
|
||
|
||
def get_directory_provider() -> DirectoryProvider: | ||
if _SINGLETON is not None: | ||
return _SINGLETON | ||
else: | ||
return StaticDirectoryProvider(tempfile.TemporaryFile(), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be tested