-
Notifications
You must be signed in to change notification settings - Fork 14
fix(sdk): make map/parallel child exec order invariant #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -222,17 +222,23 @@ def set_logger(self, new_logger: LoggerInterface): | |
| info=self._log_info, | ||
| ) | ||
|
|
||
| def _create_step_id_for_logical_step(self, step: int) -> str: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. step maybe better described as
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _create_step_id_with_preset_counter |
||
| """ | ||
| Generate a step_id based on the given logical step. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. on the given counter. "logical step" doesn't really have a well defined meaning. |
||
| This allows us to recover operation ids or even look | ||
| forward without changing the internal state of this context. | ||
| """ | ||
| step_id = f"{self._parent_id}-{step}" if self._parent_id else str(step) | ||
| return hashlib.blake2b(step_id.encode()).hexdigest()[:64] | ||
|
|
||
| def _create_step_id(self) -> str: | ||
| """Generate a thread-safe step id, incrementing in order of invocation. | ||
|
|
||
| This method is an internal implementation detail. Do not rely the exact format of | ||
| the id generated by this method. It is subject to change without notice. | ||
| """ | ||
| new_counter: int = self._step_counter.increment() | ||
| step_id = ( | ||
| f"{self._parent_id}-{new_counter}" if self._parent_id else str(new_counter) | ||
| ) | ||
| return hashlib.blake2b(step_id.encode()).hexdigest()[:64] | ||
| return self._create_step_id_for_logical_step(new_counter) | ||
|
|
||
| # region Operations | ||
|
|
||
|
|
@@ -311,13 +317,17 @@ def map( | |
| """Execute a callable for each item in parallel.""" | ||
| map_name: str | None = self._resolve_step_name(name, func) | ||
|
|
||
| def map_in_child_context(child_context) -> BatchResult[R]: | ||
| def map_in_child_context(map_context) -> BatchResult[R]: | ||
| # map_context is a child_context of the context upon which `.map` | ||
| # was called. We are calling it `map_context` to make it explicit | ||
| # that any operations happening from hereon are done on the context | ||
| # that owns the branches | ||
| return map_handler( | ||
| items=inputs, | ||
| func=func, | ||
| config=config, | ||
| execution_state=self.state, | ||
| run_in_child_context=child_context.run_in_child_context, | ||
| map_context=map_context, | ||
| ) | ||
|
|
||
| return self.run_in_child_context( | ||
|
|
@@ -337,12 +347,16 @@ def parallel( | |
| ) -> BatchResult[T]: | ||
| """Execute multiple callables in parallel.""" | ||
|
|
||
| def parallel_in_child_context(child_context) -> BatchResult[T]: | ||
| def parallel_in_child_context(parallel_context) -> BatchResult[T]: | ||
| # parallel_context is a child_context of the context upon which `.map` | ||
| # was called. We are calling it `parallel_context` to make it explicit | ||
| # that any operations happening from hereon are done on the context | ||
| # that owns the branches | ||
| return parallel_handler( | ||
| callables=functions, | ||
| config=config, | ||
| execution_state=self.state, | ||
| run_in_child_context=child_context.run_in_child_context, | ||
| parallel_context=parallel_context, | ||
| ) | ||
|
|
||
| return self.run_in_child_context( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
phrasing.
"This avoids the hidden mutation of the context's internal counter.
we can do this because we explicitly control the generation of step_id and do it
using executable.index."