Skip to content

Commit

Permalink
refactor(api): create micro-operations for motion and pipetting (#16857)
Browse files Browse the repository at this point in the history
Make some intermediates for `move_to_well` and the pipetting operations
that can handle and format exceptions and pass defined errors upstream.
In addition, make some state update changes that will pave the way for
having multiple micro-operations that can fail in each command, and get
rid of maybes.

Specifically,
- You can now chain `StateUpdate` setter calls
- New classmethod `StateUpdate.reduce()` that does a reduce over a group
of state updates
- New "micro operations" in `pipetting_common` and new `movement_common`
that basically just do exception handling for things like
`pipetting.aspirate_in_place()` and `movement.move_to_wells()` so that
the commands can treat defined errors entirely as data
- Give up on the `Maybe` experiment because it's really awful to chain
async calls in python

## testing
- No behavior should have changed. Some
`state_update_if_false_positive`s are now full overrides instead of just
extensions. Tests should cover this.

Works toward EXEC-830
  • Loading branch information
sfoster1 authored Nov 18, 2024
1 parent 25add49 commit 1e8ac74
Show file tree
Hide file tree
Showing 32 changed files with 876 additions and 805 deletions.
113 changes: 54 additions & 59 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
"""Aspirate command request, result, and implementation models."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from opentrons_shared_data.errors.exceptions import PipetteOverpressureError
from typing_extensions import Literal

from .pipetting_common import (
OverpressureError,
PipetteIdMixin,
AspirateVolumeMixin,
FlowRateMixin,
LiquidHandlingWellLocationMixin,
BaseLiquidHandlingResult,
aspirate_in_place,
)
from .movement_common import (
LiquidHandlingWellLocationMixin,
DestinationPositionResult,
move_to_well,
)
from .command import (
AbstractCommandImpl,
Expand All @@ -20,7 +24,6 @@
DefinedErrorData,
SuccessData,
)
from ..errors.error_occurrence import ErrorOccurrence

from opentrons.hardware_control import HardwareControlAPI

Expand All @@ -29,9 +32,6 @@
WellLocation,
WellOrigin,
CurrentWell,
DeckPoint,
AspiratedFluid,
FluidKind,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -99,7 +99,6 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
)

current_well = None
state_update = StateUpdate()

if not ready_to_aspirate:
await self._movement.move_to_well(
Expand All @@ -119,74 +118,70 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
well_name=well_name,
)

position = await self._movement.move_to_well(
move_result = await move_to_well(
movement=self._movement,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=params.wellLocation,
current_well=current_well,
operation_volume=-params.volume,
)
deck_point = DeckPoint.construct(x=position.x, y=position.y, z=position.z)
state_update.set_pipette_location(

aspirate_result = await aspirate_in_place(
pipette_id=pipette_id,
new_labware_id=labware_id,
new_well_name=well_name,
new_deck_point=deck_point,
volume=params.volume,
flow_rate=params.flowRate,
location_if_error={
"retryLocation": (
move_result.public.position.x,
move_result.public.position.y,
move_result.public.position.z,
)
},
command_note_adder=self._command_note_adder,
pipetting=self._pipetting,
model_utils=self._model_utils,
)

try:
volume_aspirated = await self._pipetting.aspirate_in_place(
pipette_id=pipette_id,
volume=params.volume,
flow_rate=params.flowRate,
command_note_adder=self._command_note_adder,
)
except PipetteOverpressureError as e:
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
if isinstance(aspirate_result, DefinedErrorData):
return DefinedErrorData(
public=OverpressureError(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
error=e,
)
],
errorInfo={"retryLocation": (position.x, position.y, position.z)},
public=aspirate_result.public,
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update=state_update,
)
else:
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
state_update_if_false_positive=StateUpdate.reduce(
move_result.state_update,
aspirate_result.state_update_if_false_positive,
),
volume_added=-volume_aspirated
* self._state_view.geometry.get_nozzles_per_well(
labware_id, well_name, pipette_id
),
)
state_update.set_fluid_aspirated(
pipette_id=params.pipetteId,
fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=volume_aspirated),
)
else:
return SuccessData(
public=AspirateResult(
volume=volume_aspirated,
position=deck_point,
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
),
state_update=state_update,
)


Expand Down
120 changes: 53 additions & 67 deletions api/src/opentrons/protocol_engine/commands/aspirate_in_place.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from typing import TYPE_CHECKING, Optional, Type, Union
from typing_extensions import Literal

from opentrons_shared_data.errors.exceptions import PipetteOverpressureError

from opentrons.hardware_control import HardwareControlAPI

from .pipetting_common import (
Expand All @@ -14,6 +12,7 @@
FlowRateMixin,
BaseLiquidHandlingResult,
OverpressureError,
aspirate_in_place,
)
from .command import (
AbstractCommandImpl,
Expand All @@ -22,10 +21,9 @@
SuccessData,
DefinedErrorData,
)
from ..errors.error_occurrence import ErrorOccurrence
from ..errors.exceptions import PipetteNotReadyToAspirateError
from ..state.update_types import StateUpdate, CLEAR
from ..types import CurrentWell, AspiratedFluid, FluidKind
from ..state.update_types import CLEAR
from ..types import CurrentWell

if TYPE_CHECKING:
from ..execution import PipettingHandler, GantryMover
Expand Down Expand Up @@ -94,83 +92,71 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn:
" so the plunger can be reset in a known safe position."
)

state_update = StateUpdate()
current_position = await self._gantry_mover.get_position(params.pipetteId)
current_location = self._state_view.pipettes.get_current_location()

try:
current_position = await self._gantry_mover.get_position(params.pipetteId)
volume = await self._pipetting.aspirate_in_place(
pipette_id=params.pipetteId,
volume=params.volume,
flow_rate=params.flowRate,
command_note_adder=self._command_note_adder,
)
except PipetteOverpressureError as e:
result = await aspirate_in_place(
pipette_id=params.pipetteId,
volume=params.volume,
flow_rate=params.flowRate,
location_if_error={
"retryLocation": (
current_position.x,
current_position.y,
current_position.z,
)
},
command_note_adder=self._command_note_adder,
pipetting=self._pipetting,
model_utils=self._model_utils,
)
if isinstance(result, DefinedErrorData):
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
return DefinedErrorData(
public=result.public,
state_update=result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
volume_added=CLEAR,
state_update_if_false_positive=result.state_update_if_false_positive,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
return DefinedErrorData(
public=OverpressureError(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
error=e,
)
],
errorInfo=(
{
"retryLocation": (
current_position.x,
current_position.y,
current_position.z,
)
}
),
),
state_update=state_update,
)
else:
return result
else:
state_update.set_fluid_aspirated(
pipette_id=params.pipetteId,
fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=volume),
)
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=-volume
* self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
return SuccessData(
public=AspirateInPlaceResult(volume=result.public.volume),
state_update=result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=-result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
),
)

return SuccessData(
public=AspirateInPlaceResult(volume=volume),
state_update=state_update,
)
else:
return SuccessData(
public=AspirateInPlaceResult(volume=result.public.volume),
state_update=result.state_update,
)


class AspirateInPlace(
Expand Down
Loading

0 comments on commit 1e8ac74

Please sign in to comment.