Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speculative visit directory service #283

Closed
wants to merge 26 commits into from
Closed

Conversation

callumforrester
Copy link
Collaborator

@callumforrester callumforrester commented Jul 31, 2023

PLEASE DO NOT MERGE!
This PR is intended as a speculative prototyping exercise only.

Design

In order to comply with the DLS visit directory structure, we need to group acquisitions into files. There are many solutions to this, the basics of this one:

image

  1. A new service is responsible for determining where data will go for a particular acquisition. This functionality is separated from all other applications for two reasons:
    • So that other data collection applications can also use it each one is guaranteed unique data locations.
    • This involves making directories and therefore requires write access to places where data goes. For security reasons, only trusted code should have that level of privilege, and it should be kept small and isolated.
  2. Every time a group of devices is staged, blueapi will call out to this service (or use a dummy version for offline development) and get a unique data directory for this stage. This is achieved via a plan preprocessor that detects groups of stages and makes the call.
  3. Compatible Ophyd detectors will be aware of a mutable state which is updated when this call occurs, so they can write to the "current" visit.
  4. The current group is bundled into the run start metadata. Allowing for a group:run mapping of 1:1 or 1:many. The use case for the latter is writing many runs to a single file for performance reasons.
  5. Any aggregating service, such as a nexus file writer, can take the group information from the run starts and use it how it likes. Our initial thinking was to map each group to a nexus file and reach run to an entry, but that is TBD.

Design Issues

I remain very sceptical about point 2! It introduces a shared mutable state between all detectors just to get around the fact that there is no direct, universal way to pass the current visit down to them, by design! The other alternative I explored is to put all support for configuring individual detectors in one place, in blueapi, and if your detector isn't supported it will just write as it normally does. Simple example

def configure_data_writing(
    devices: Iterable[Device],
    collection: DataCollection,
) -> None:
    for device in devices:
        if isinstance(device, FileStoreBase):
            path_template = str(collection.raw_data_files_root)

            # Configure Ophyd Device to setup HDF5 writer
            device.reg_root = "/"
            device.read_path_template = path_template
            device.write_path_template = path_template

What's Prototyped Here?

The actual contents of this PR:

  • A very simple visit service that can create and inform clients about unique directories for "acquisition groups".
  • A plan preprocessor that calls out to that service (or use an internal dummy version) to create a new group when a set of devices is staged. It also puts metadata into the run start document to show which group it is a part of. Each start document now contains a key called data_collection_number, which can be passed back to the visit service for more information.
  • A dummy device that has access to the cached result of the service call.
  • A series of tests ensuring that the correct mapping of run -> file is achieved for various use cases, such as multi-run plans.
  • A small amendment to the bluesky context that makes blueapi use this preprocessor on every plan it runs (if the plan does not contain any stages, it will be unaffected)

How to test

Checkout this branch. To test with the dummy detector, run

tox -e pytest -- tests/plugins/test_data_writing.py

To start the visit service:

python -m blueapi.plugins.data_writing_server

It should run on port 8089, you can test it with

curl -X POST "http://localhost:8089/collection/mybeamline"

Which should create a new group and return a blob of JSON about it.

Running a scan with blueapi should now produce a start document that contains an attribute called data_collection_number which we can use to aggregate runs.

Still to do

This PR contains a dummy detector with the correct hooks. It would be good to get this working with an actual Ophyd v2 detector, and ideally a v1 one too. We could then run a full collection via blueapi that uses this functionality @coretl @callumforrester

More discussion on point 2. of the design would be helpful, I think. @coretl @DiamondJoseph @keithralphs @callumforrester

We should try to get the nexus writer to interpret the group number in the run start document and ensure there are no problems there @DiamondJoseph

Possibly leverage the data_session metadata in the run start schema. @callumforrester @DiamondJoseph

@codecov
Copy link

codecov bot commented Jul 31, 2023

Codecov Report

Merging #283 (60354a8) into main (3f87223) will decrease coverage by 0.73%.
The diff coverage is 76.00%.

❗ Current head 60354a8 differs from pull request most recent head 49dd587. Consider uploading reports for the commit 49dd587 to get more accurate results

@@            Coverage Diff             @@
##             main     #283      +/-   ##
==========================================
- Coverage   89.07%   88.34%   -0.73%     
==========================================
  Files          39       44       +5     
  Lines        1446     1536      +90     
==========================================
+ Hits         1288     1357      +69     
- Misses        158      179      +21     
Files Coverage Δ
src/blueapi/core/__init__.py 100.00% <100.00%> (ø)
src/blueapi/core/bluesky_types.py 89.74% <100.00%> (+1.17%) ⬆️
src/blueapi/core/device_walk.py 100.00% <100.00%> (ø)
src/blueapi/plans/plans.py 100.00% <100.00%> (ø)
src/blueapi/plugins/data_writing.py 83.60% <83.60%> (ø)
src/blueapi/plugins/data_writing_server.py 55.55% <55.55%> (ø)

... and 9 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

src/blueapi/core/device_walk.py Outdated Show resolved Hide resolved
src/blueapi/core/device_walk.py Outdated Show resolved Hide resolved
def stage(self) -> List[object]:
collection = self._provider.current_data_collection
if collection is None:
raise Exception("No active collection")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems pretty readable to me, requires explicit knowledge of a DataCollectionProvider when making the class, but that makes it obvious where the collection number is coming from

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's readable but it is also a bodge:

image

Do we really want to bake such a workaround into Ophyd v2 when we have a chance to design a nice, shiny new system?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From discussion, happy with this because it is theoretically easy to strip out later, as long as:

  • We keep in mind that we want to strip it out later
  • We make an interface or ABC for the singleton in ophyd-async
  • We make a DLS-specific implementation in blueapi
  • We maintain a parallel implementation in ophyd-async for offline use, that just writes to a preconfigured directory or similar
  • We inject references to the singleton into each device

src/blueapi/plugins/data_writing.py Outdated Show resolved Hide resolved
Comment on lines +66 to +74
wrapped_plan = functools.reduce(
lambda wrapped, next_wrapper: next_wrapper(wrapped),
self.plan_wrappers,
plan,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be obvious to people coming from python, but making plan be a kwarg initializer=plan would stop me running off to read the docs for functools every time I see it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce explicitly disallows keyword arguments

src/blueapi/plugins/data_writing_server.py Outdated Show resolved Hide resolved
DATA_COLLECTION_NUMBER = "data_collection_number"


class DataCollectionProvider(ABC):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed and replaced with

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see a direct replacement to the DataCollectionProvider in the file you've linked there...

your DataCollectionProvider gives a DataCollection, which has 4 things: collection_number, group, raw_data_files_root and nexus_file_path. Tom's work that you've linked has a DirectoryProvider which gives a DirectoryInfo object, which just contains a directory path and filename prefix.

So it's not a direct mapping. Perhaps I'll chat to you about this tomorrow morning in more detail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be a direct mapping, you can just change the logic here to match up with Tom's DirectoryProvider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, got it

Copy link
Contributor

@rosesyrett rosesyrett Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've come across a bit of an annoyance; I want to subclass DirectoryProvider so it gets the directory from the visit service. However, I want to use aiohttp or some similar asynchronous library... DirectoryProvider has one synchronous method, __call__.

My solution for now, is to make a subclass GDADirectoryProvider (suggestions for a better name?) and have a async update method on this. Then the synchronous __call__ can do something like asyncio.wait_for(update).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think that's what I told @coretl I had in mind at the time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, the pre-processor should do the async access to the directory service, then cache the result in DirectoryProvider, then the __call__ method just returns the cached value

Comment on lines 80 to 106
def data_writing_wrapper(
plan: MsgGenerator,
provider: DataCollectionProvider,
) -> MsgGenerator:
staging = False
for message in plan:
if message.command == "stage":
if not staging:
yield from bps.wait_for([provider.update])
staging = True
if provider.current_data_collection is None:
raise Exception("There is no active data collection")
elif staging:
staging = False

if message.command == "open_run":
if provider.current_data_collection is None:
yield from bps.wait_for([provider.update])
if provider.current_data_collection is None:
raise Exception("There is no active data collection")
message.kwargs[
DATA_COLLECTION_NUMBER
] = provider.current_data_collection.collection_number
yield message


data_writing_decorator = make_decorator(data_writing_wrapper)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only actual part of this file that should stick around. I'm not convinced it should be called plugins/data_writing.py either. Maybe preprocessors/xyz.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's going to be a plan preprocessor should it be moved to dls-bluesky-core?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. It is specifically a part of blueapi, that fudges plans to write data in our structure, maybe it shouldn't be allowed to get outside of blueapi?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it existing only within BlueAPI and being applied on plan import: prevents possibility of attempting to apply multiple times (pretty sure from glancing through the logic that would just doubly increment the data collection number each run, but that's disruptive enough) or being missed.

Should anyone want to be a. using vanilla Bluesky/not BlueAPI, b. wanting to write to DLS filesystem, they can extract and duplicate the logic. But I don't think that'll be very often.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But on the original comment: Yes, having a blueapi.preprocessors package seems best.

plans: Dict[str, Plan] = field(default_factory=dict)
devices: Dict[str, Device] = field(default_factory=dict)
plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict)

_reference_cache: Dict[Type, Type] = field(default_factory=dict)

def wrap(self, plan: MsgGenerator) -> MsgGenerator:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be tested

Comment on lines +32 to +33
wrapped_plan_generator = ctx.wrap(plan_generator)
ctx.run_engine(wrapped_plan_generator)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs testing too

)


def data_writing_wrapper(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a nicer way of doing this, @rosesyrett to investigate and make my flowchart look less ugly

Copy link
Contributor

@keithralphs keithralphs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be discussed further...

return str(self.sources) == str(other.sources)
return (
(str(self.sources) == str(other.sources))
and (self.facility == other.facility)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left out visit_id here

return DataCollectionIdentifier(collectionNumber=self._count)


class VisitDirectoryProvider(DirectoryProvider):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs tests

@@ -86,6 +104,18 @@ def with_config(self, config: EnvironmentConfig) -> None:
elif source.kind is SourceKind.DODAL:
self.with_dodal_module(mod)

call_in_bluesky_event_loop(self.connect_devices(self.sim))

async def connect_devices(self, sim: bool = False) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should make private and rename to _connect_ophyd_async_devices

rosesyrett pushed a commit that referenced this pull request Oct 25, 2023
…315)

Production version of #283 

This PR allows coordination with a central service for creating unique
groups of data (called collections) and configures ophyd-async detectors
to write their data to the same location for a given collection.

Changes:
- Add mechanism to preprocess all plans with set bluesky
[preprocessors](https://blueskyproject.io/bluesky/plans.html#plan-preprocessors)
- Create directory provider that knows how to talk to GDA's visit
directory API and provide a unique collection number to group data files
- Create dummy directory provider that works in a similar way without
the need for an external server (useful for development)
- Create preprocessor that uses the directory provider and groups
detectors by staging, also bundles the data group information into run
start documents on a best effort basis.
- Add tests

---------

Co-authored-by: Rose Yemelyanova <[email protected]>
@stan-dot
Copy link
Collaborator

willing to test with with the webcam at visr https://github.com/DiamondLightSource/ViSR

@coretl and @callumforrester is this a good idea?

@callumforrester
Copy link
Collaborator Author

This is very old and out-of-date, everything in it now exists in blueapi in some form.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants