Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speculative visit directory service #283

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
25772e9
Speculative visit directory service
callumforrester Jul 31, 2023
20f2ac0
Clean up
callumforrester Jul 31, 2023
ebff3fd
More cleaning up
callumforrester Jul 31, 2023
f0c287d
Add tests
callumforrester Jul 31, 2023
e0d9905
Simplify tests for initial version
callumforrester Jul 31, 2023
a56a4b9
Write plan preprocessors
callumforrester Aug 1, 2023
c45b0ba
Ensure correct collection numbers are propagated to detectors
callumforrester Aug 1, 2023
92fdbb5
Simplify preprocessor
callumforrester Aug 1, 2023
7daaf01
Fix mypy errors
callumforrester Aug 1, 2023
d4176ae
Add kwarg to explain reduce
callumforrester Aug 16, 2023
44d9c13
init
Oct 3, 2023
7765855
temp
Oct 10, 2023
dabed95
link directory provider
Oct 10, 2023
de1a2ad
Add capability for context to connect ophyd-async devices
Oct 19, 2023
1108ee6
Post-rebase changes
Oct 19, 2023
54d882f
Import inject from dls-bluesky-core instead
Oct 23, 2023
ece0e84
Use dodal singleton
callumforrester Oct 23, 2023
06daaf3
Setup visit directory service connection
callumforrester Oct 23, 2023
2cf7ff5
Begin fixing attach metadata tests
callumforrester Oct 23, 2023
16593a3
Fix test cases
callumforrester Oct 24, 2023
a258e2c
Allow for dummy visit service handler
callumforrester Oct 24, 2023
c8edb91
Move visit directory service from dodal into blueapi
callumforrester Oct 24, 2023
44fe64a
Fix tests, remove kwargs from context
callumforrester Oct 24, 2023
5a830c0
Fix formatting
callumforrester Oct 24, 2023
c572c02
Make use of dodal directory provider singleton optional
callumforrester Oct 24, 2023
49dd587
Remove cloud vscode settings
callumforrester Oct 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
},
"esbonio.server.enabled": true,
"esbonio.sphinx.confDir": "",
}
}
3 changes: 2 additions & 1 deletion docs/developer/explanations/lifecycle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ of being written, loaded and run. Take the following plan.
from typing import Any, List, Mapping, Optional, Union

import bluesky.plans as bp
from blueapi.core import MsgGenerator, inject
from blueapi.core import MsgGenerator
from dls_bluesky_core.core import inject
from bluesky.protocols import Readable


Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ classifiers = [
]
description = "Lightweight Bluesky-as-a-service wrapper application. Also usable as a library."
dependencies = [
"bluesky<1.11",
"bluesky",
"ophyd",
"nslsii",
"pyepics",
Expand All @@ -24,7 +24,7 @@ dependencies = [
"fastapi[all]<0.100",
"uvicorn",
"requests",
"dls_bluesky_core",
"dls_bluesky_core @ git+https://github.com/DiamondLightSource/dls-bluesky-core.git@main",
"dls-dodal",
"typing_extensions<4.6",
]
Expand Down
12 changes: 7 additions & 5 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class ScratchConfig(BlueapiBaseModel):
auto_make_directory: bool = Field(default=False)


class DataWritingConfig(BlueapiBaseModel):
visit_service_url: Optional[str] = None # e.g. "http://localhost:8088/api"
visit_directory: Path = Path("/tmp/0-0")
group_name: str = "example"


class EnvironmentConfig(BlueapiBaseModel):
"""
Config for the RunEngine environment
Expand All @@ -63,11 +69,7 @@ class EnvironmentConfig(BlueapiBaseModel):
Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"),
]
scratch: Optional[ScratchConfig] = Field(default=None)

def __eq__(self, other: object) -> bool:
if isinstance(other, EnvironmentConfig):
return str(self.sources) == str(other.sources)
return False
data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig)


class LoggingConfig(BlueapiBaseModel):
Expand Down
2 changes: 0 additions & 2 deletions src/blueapi/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
is_bluesky_plan_generator,
)
from .context import BlueskyContext
from .device_lookup import inject
from .event import EventPublisher, EventStream

__all__ = [
Expand All @@ -26,7 +25,6 @@
"EventStream",
"DataEvent",
"WatchableStatus",
"inject",
"is_bluesky_compatible_device",
"is_bluesky_plan_generator",
"is_bluesky_compatible_device_type",
Expand Down
4 changes: 4 additions & 0 deletions src/blueapi/core/bluesky_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
WritesExternalAssets,
)
from dls_bluesky_core.core import MsgGenerator, PlanGenerator
from ophyd_async.core import Device as AsyncDevice
from pydantic import BaseModel, Field

from blueapi.utils import BlueapiBaseModel
Expand All @@ -28,6 +29,8 @@
except ImportError:
from typing_extensions import Protocol, runtime_checkable # type: ignore

PlanWrapper = Callable[[MsgGenerator], MsgGenerator]

#: An object that encapsulates the device to do useful things to produce
# data (e.g. move and read)
Device = Union[
Expand All @@ -45,6 +48,7 @@
WritesExternalAssets,
Configurable,
Triggerable,
AsyncDevice,
]

#: Protocols defining interface to hardware
Expand Down
36 changes: 33 additions & 3 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import logging
from dataclasses import dataclass, field
from importlib import import_module
Expand All @@ -10,6 +11,7 @@
Generic,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Expand All @@ -19,19 +21,24 @@
get_type_hints,
)

from bluesky import RunEngine
from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop
from ophyd_async.core import Device as AsyncDevice
from ophyd_async.core import wait_for_connection
from pydantic import create_model
from pydantic.fields import FieldInfo, ModelField

from blueapi.config import EnvironmentConfig, SourceKind
from blueapi.data_management.gda_directory_provider import VisitDirectoryProvider
from blueapi.utils import BlueapiPlanModelConfig, load_module_all

from .bluesky_types import (
BLUESKY_PROTOCOLS,
Device,
HasName,
MsgGenerator,
Plan,
PlanGenerator,
PlanWrapper,
is_bluesky_compatible_device,
is_bluesky_plan_generator,
)
Expand All @@ -51,12 +58,23 @@ class BlueskyContext:
run_engine: RunEngine = field(
default_factory=lambda: RunEngine(context_managers=[])
)
plan_wrappers: Sequence[PlanWrapper] = field(default_factory=list)
plans: Dict[str, Plan] = field(default_factory=dict)
devices: Dict[str, Device] = field(default_factory=dict)
plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict)
directory_provider: Optional[VisitDirectoryProvider] = field(default=None)
sim: bool = field(default=False)

_reference_cache: Dict[Type, Type] = field(default_factory=dict)

def wrap(self, plan: MsgGenerator) -> MsgGenerator:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be tested

wrapped_plan = functools.reduce(
lambda wrapped, next_wrapper: next_wrapper(wrapped),
self.plan_wrappers,
plan,
Comment on lines +71 to +74
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be obvious to people coming from python, but making plan be a kwarg initializer=plan would stop me running off to read the docs for functools every time I see it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce explicitly disallows keyword arguments

)
yield from wrapped_plan

def find_device(self, addr: Union[str, List[str]]) -> Optional[Device]:
"""
Find a device in this context, allows for recursive search.
Expand Down Expand Up @@ -86,6 +104,18 @@ def with_config(self, config: EnvironmentConfig) -> None:
elif source.kind is SourceKind.DODAL:
self.with_dodal_module(mod)

call_in_bluesky_event_loop(self.connect_devices(self.sim))

async def connect_devices(self, sim: bool = False) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should make private and rename to _connect_ophyd_async_devices

coros = {}
for device_name, device in self.devices.items():
if isinstance(device, AsyncDevice):
device.set_name(device_name)
coros[device_name] = device.connect(sim)

if len(coros) > 0:
await wait_for_connection(**coros)

def with_plan_module(self, module: ModuleType) -> None:
"""
Register all functions in the module supplied as plans.
Expand Down Expand Up @@ -113,10 +143,10 @@ def plan_2(...) -> MsgGenerator:
def with_device_module(self, module: ModuleType) -> None:
self.with_dodal_module(module)

def with_dodal_module(self, module: ModuleType, **kwargs) -> None:
def with_dodal_module(self, module: ModuleType) -> None:
from dodal.utils import make_all_devices

for device in make_all_devices(module, **kwargs).values():
for device in make_all_devices(module).values():
self.device(device)

def plan(self, plan: PlanGenerator) -> PlanGenerator:
Expand Down
18 changes: 0 additions & 18 deletions src/blueapi/core/device_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,3 @@ def find_component(obj: Any, addr: List[str]) -> Optional[D]:
f"Found {component} in {obj} while searching for {addr} "
"but it is not a device"
)


def inject(name: str):
"""
Function to mark a default argument of a plan method as a reference to a device
that is stored in the Blueapi context.
Bypasses mypy linting, returning x as Any and therefore valid as a default
argument.

Args:
name (str): Name of a device to be fetched from the Blueapi context

Returns:
Any: name but without typing checking, valid as any default type

"""

return name
Empty file.
146 changes: 146 additions & 0 deletions src/blueapi/data_management/gda_directory_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging
import tempfile
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional

from aiohttp import ClientSession
from ophyd_async.core import DirectoryInfo, DirectoryProvider, StaticDirectoryProvider
from pydantic import BaseModel


class DataCollectionIdentifier(BaseModel):
collectionNumber: int


class VisitServiceClientBase(ABC):
"""
Object responsible for I/O in determining collection number
"""

@abstractmethod
async def create_new_collection(self) -> DataCollectionIdentifier:
...

@abstractmethod
async def get_current_collection(self) -> DataCollectionIdentifier:
...


class VisitServiceClient(VisitServiceClientBase):
_url: str

def __init__(self, url: str) -> None:
self._url = url

async def create_new_collection(self) -> DataCollectionIdentifier:
async with ClientSession() as session:
async with session.post(f"{self._url}/numtracker") as response:
if response.status == 200:
json = await response.json()
return DataCollectionIdentifier.parse_obj(json)
else:
raise Exception(response.status)

async def get_current_collection(self) -> DataCollectionIdentifier:
async with ClientSession() as session:
async with session.get(f"{self._url}/numtracker") as response:
if response.status == 200:
json = await response.json()
return DataCollectionIdentifier.parse_obj(json)
else:
raise Exception(response.status)


class LocalVisitServiceClient(VisitServiceClientBase):
_count: int

def __init__(self) -> None:
self._count = 0

async def create_new_collection(self) -> DataCollectionIdentifier:
count = self._count
self._count += 1
return DataCollectionIdentifier(collectionNumber=count)

async def get_current_collection(self) -> DataCollectionIdentifier:
return DataCollectionIdentifier(collectionNumber=self._count)


class VisitDirectoryProvider(DirectoryProvider):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs tests

"""
Gets information from a remote service to construct the path that detectors
should write to, and determine how their files should be named.
"""

_data_group_name: str
_data_directory: Path

_client: VisitServiceClientBase
_current_collection: Optional[DirectoryInfo]
_session: Optional[ClientSession]

def __init__(
self,
data_group_name: str,
data_directory: Path,
client: VisitServiceClientBase,
):
self._data_group_name = data_group_name
self._data_directory = data_directory
self._client = client

self._current_collection = None
self._session = None

async def update(self) -> None:
"""
Calls the visit service to create a new data collection in the current visit.
"""
# TODO: After visit service is more feature complete:
# TODO: Allow selecting visit as part of the request to BlueAPI
# TODO: Consume visit information from BlueAPI and pass down to this class
# TODO: Query visit service to get information about visit and data collection
# TODO: Use AuthN information as part of verification with visit service

try:
collection_id_info = await self._client.create_new_collection()
self._current_collection = self._generate_directory_info(collection_id_info)
except Exception as ex:
# TODO: The catch all is needed because the RunEngine will not
# currently handle it, see
# https://github.com/bluesky/bluesky/pull/1623
self._current_collection = None
logging.exception(ex)

def _generate_directory_info(
self,
collection_id_info: DataCollectionIdentifier,
) -> DirectoryInfo:
collection_id = collection_id_info.collectionNumber
file_prefix = f"{self._data_group_name}-{collection_id}"
return DirectoryInfo(str(self._data_directory), file_prefix)

def __call__(self) -> DirectoryInfo:
if self._current_collection is not None:
return self._current_collection
else:
raise ValueError(
"No current collection, update() needs to be called at least once"
)


_SINGLETON: Optional[VisitDirectoryProvider] = None


def set_directory_provider_singleton(provider: VisitDirectoryProvider):
global _SINGLETON

_SINGLETON = provider


def get_directory_provider() -> DirectoryProvider:
if _SINGLETON is not None:
return _SINGLETON
else:
return StaticDirectoryProvider(tempfile.TemporaryFile(), "")
Loading
Loading