Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove public fields of BlueskyContext and replace with methods #174

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/blueapi/core/bluesky_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def is_bluesky_plan_generator(func: PlanGenerator) -> bool:

class Plan(BlueapiBaseModel):
"""
A plan that can be run
Metadata about a plan that can be run
"""

name: str = Field(description="Referenceable name of the plan")
Expand Down
135 changes: 117 additions & 18 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from dataclasses import dataclass, field
from importlib import import_module
from inspect import Parameter, signature
from pathlib import Path
Expand All @@ -8,6 +7,7 @@
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Expand Down Expand Up @@ -37,44 +37,136 @@
LOGGER = logging.getLogger(__name__)


@dataclass
class BlueskyContext:
"""
Context for building a Bluesky application
"""

run_engine: RunEngine = field(
default_factory=lambda: RunEngine(context_managers=[])
)
plans: Dict[str, Plan] = field(default_factory=dict)
devices: Dict[str, Device] = field(default_factory=dict)
plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict)
_run_engine: RunEngine
_plans: Dict[str, Plan]
_devices: Dict[str, Device]
_plan_functions: Dict[str, PlanGenerator]
_reference_cache: Dict[Type, Type]

def __init__(self, run_engine: Optional[RunEngine] = None) -> None:
self._run_engine = run_engine or RunEngine(context_managers=[])
self._devices = {}
self._plans = {}
self._plan_functions = {}
self._reference_cache = {}

@property
def run_engine(self) -> RunEngine:
"""
RunEngine capable of running the plans in the context against the devices
in the context.

Returns:
RunEngine: A Bluesky RunEngine
"""

return self._run_engine

def find_plan_function(self, name: str) -> Optional[PlanGenerator]:
"""
Return a plan function matching the given name

Args:
name: The name of the plan

Returns:
Optional[PlanGenerator]: The plan function, None if there is
no plan matching the name in the context.
"""
return self._plan_functions.get(name)

def find_plan_metadata(self, name: str) -> Optional[Plan]:
"""
Return a metadata object describing a plan in the context

Args:
name: The name of the plan

Returns:
Optional[Plan]: The plan metadata, None if there is
no plan matching the name in the context.
"""

return self._plans.get(name)

def all_devices(self) -> Iterable[Device]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth making these two properties as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or not having any properties... I'm always on the fence between the potential for confusion and the nice read-only-ness

"""
Return iterable of all devices in the context.

Returns:
Iterable[Device]: Iterable of devices
"""

return self._devices.values()

def all_plan_metadata(self) -> Iterable[Plan]:
"""
Return iterable of all plan metadata in the context.

Returns:
Iterable[Device]: Iterable of plan metadata
"""

return self._plans.values()

def has_plan(self, name: str) -> bool:
"""
Check if a plan exists in the context.

_reference_cache: Dict[Type, Type] = field(default_factory=dict)
Args:
name: The name of the plan

Returns:
bool: True if the plan is accessible from this context
"""

return name in self._plans

def find_device(self, addr: Union[str, List[str]]) -> Optional[Device]:
"""
Find a device in this context, allows for recursive search.

Args:
addr (Union[str, List[str]]): Address of the device, examples:
"motors", "motors.x"
addr: Address of the device, examples: "motors", "motors.x"

Returns:
Optional[Device]: _description_
Optional[Device]: A Bluesky compatible device, None if no device
matching addr exists in the context.
"""

if isinstance(addr, str):
list_addr = list(addr.split("."))
return self.find_device(list_addr)
else:
return find_component(self.devices, addr)
return find_component(self._devices, addr)

def with_startup_script(self, path: Union[Path, str]) -> None:
"""
Register all plans and devices in the Python file at this path.

Args:
path: Path to the Python file. Can be a posix file path
or a module path if pointing to a valid Python
module in your environment.
"""

mod = import_module(str(path))
self.with_module(mod)

def with_module(self, module: ModuleType) -> None:
"""
Register all plans and devices in this module

Args:
module: A module, usually retrieved from an
import statement
"""

self.with_plan_module(module)
self.with_device_module(module)

Expand All @@ -95,14 +187,21 @@ def plan_2(...):
__all__ = ["plan_1", "plan_2"]

Args:
module (ModuleType): Module to pass in
module: Module to check for plans
"""

for obj in load_module_all(module):
if is_bluesky_plan_generator(obj):
self.plan(obj)

def with_device_module(self, module: ModuleType) -> None:
"""
Register all devices in the module supplied

Args:
module: Module to check for devices
"""

for obj in load_module_all(module):
if is_bluesky_compatible_device(obj):
self.device(obj)
Expand All @@ -129,15 +228,15 @@ def my_plan(a: int, b: str):
__config__=BlueapiPlanModelConfig,
**self._type_spec_for_function(plan),
)
self.plans[plan.__name__] = Plan(name=plan.__name__, model=model)
self.plan_functions[plan.__name__] = plan
self._plans[plan.__name__] = Plan(name=plan.__name__, model=model)
self._plan_functions[plan.__name__] = plan
return plan

def device(self, device: Device, name: Optional[str] = None) -> None:
"""
Register an device in the context. The device needs to be registered with a
name. If the device is Readable, Movable or Flyable it has a `name`
attribbute which can be used. The attribute can be overrideen with the
attribute which can be used. The attribute can be overridden with the
`name` parameter here. If the device conforms to a different protocol then
the parameter must be used to name it.

Expand All @@ -158,7 +257,7 @@ def device(self, device: Device, name: Optional[str] = None) -> None:
else:
raise KeyError(f"Must supply a name for this device: {device}")

self.devices[name] = device
self._devices[name] = device

def _reference(self, target: Type) -> Type:
"""
Expand Down
4 changes: 2 additions & 2 deletions src/blueapi/service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _on_run_request(self, message_context: MessageContext, task: RunPlan) -> Non
self._template.send(reply_queue, response)

def _get_plans(self, message_context: MessageContext, message: PlanRequest) -> None:
plans = list(map(PlanModel.from_plan, self._ctx.plans.values()))
plans = list(map(PlanModel.from_plan, self._ctx.all_plan_metadata()))
response = PlanResponse(plans=plans)

assert message_context.reply_destination is not None
Expand All @@ -90,7 +90,7 @@ def _get_plans(self, message_context: MessageContext, message: PlanRequest) -> N
def _get_devices(
self, message_context: MessageContext, message: DeviceRequest
) -> None:
devices = list(map(DeviceModel.from_device, self._ctx.devices.values()))
devices = list(map(DeviceModel.from_device, self._ctx.all_devices()))
response = DeviceResponse(devices=devices)

assert message_context.reply_destination is not None
Expand Down
13 changes: 8 additions & 5 deletions src/blueapi/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ class RunPlan(Task):
def do_task(self, ctx: BlueskyContext) -> None:
LOGGER.info(f"Asked to run plan {self.name} with {self.params}")

plan = ctx.plans[self.name]
func = ctx.plan_functions[self.name]
sanitized_params = _lookup_params(ctx, plan, self.params)
plan_generator = func(**sanitized_params.dict())
metadata = ctx.find_plan_metadata(self.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these two functions are only ever called together and both have to be present, it'd be clearer to have a single find_plan(name) -> Optional[(Plan, PlanFunc)]. Probably suggests plans and plan_functions should be merged into a single field in the context as well. Maybe a dict of name -> (Plan, func)?

func = ctx.find_plan_function(self.name)
if metadata is None or func is None:
raise KeyError(f"No plan named {self.name}")
sanitized_params = _lookup_params(metadata, self.params).dict()
plan_generator = func(**sanitized_params)
ctx.run_engine(plan_generator)


def _lookup_params(
ctx: BlueskyContext, plan: Plan, params: Mapping[str, Any]
plan: Plan,
params: Mapping[str, Any],
) -> BaseModel:
"""
Checks plan parameters against context
Expand Down
8 changes: 4 additions & 4 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def some_configurable() -> SomeConfigurable:
@pytest.mark.parametrize("plan", [has_no_params, has_one_param, has_some_params])
def test_add_plan(empty_context: BlueskyContext, plan: PlanGenerator) -> None:
empty_context.plan(plan)
assert plan.__name__ in empty_context.plans
assert empty_context.has_plan(plan.__name__)


@pytest.mark.parametrize(
Expand All @@ -111,14 +111,14 @@ def test_add_invalid_plan(empty_context: BlueskyContext, plan: PlanGenerator) ->

def test_add_named_device(empty_context: BlueskyContext, sim_motor: SynAxis) -> None:
empty_context.device(sim_motor)
assert empty_context.devices[SIM_MOTOR_NAME] is sim_motor
assert empty_context.find_device(SIM_MOTOR_NAME) is sim_motor


def test_add_nameless_device(
empty_context: BlueskyContext, some_configurable: SomeConfigurable
) -> None:
empty_context.device(some_configurable, "conf")
assert empty_context.devices["conf"] is some_configurable
assert empty_context.find_device("conf") is some_configurable


def test_add_nameless_device_without_override(
Expand All @@ -133,7 +133,7 @@ def test_override_device_name(
empty_context: BlueskyContext, sim_motor: SynAxis
) -> None:
empty_context.device(sim_motor, "foo")
assert empty_context.devices["foo"] is sim_motor
assert empty_context.find_device("foo") is sim_motor


@pytest.mark.parametrize(
Expand Down