Is it possible to support human in the middle from multiple concurrent subgraphs? #3398
Unanswered
mattcoulter7
asked this question in
Q&A
Replies: 1 comment
-
This was my implementation of a synchronous human in the middle flow, combined with executing the sub graph. Strangely enough, this approach doesn't even execute the child graph on the second time, it just skips over it... Am I missing something? # --- PARENT GRAPH ---
class ParentState(BaseModel):
prompts: List[str] = Field(
...,
description="What is going to be asked to the user?"
)
human_inputs: Annotated[List[str], operator.add] = Field(
default_factory=list,
description="All of my messages"
)
prompt: Optional[str] = Field(
None,
description="The current prompt that is being worked on"
)
prompt_index: int = Field(
0,
description="The index of the current prompt we are working on"
)
def router(state: ParentState):
return str(bool(state.prompt_index < len(state.prompts)))
def placeholder(state: ParentState):
return dict()
def set_current_prompt(state: ParentState):
return dict(
prompt=state.prompts[state.prompt_index],
)
def increment_prompt_index(state: ParentState):
return dict(
prompt_index=state.prompt_index + 1,
)
parent_graph_builder = StateGraph(ParentState)
parent_graph_builder.add_node("placeholder", placeholder)
parent_graph_builder.add_node("set_current_prompt", set_current_prompt)
parent_graph_builder.add_node("child_graph", child_graph)
parent_graph_builder.add_node("increment_prompt_index", increment_prompt_index)
parent_graph_builder.add_edge(START, "placeholder")
parent_graph_builder.add_conditional_edges(
"placeholder",
router,
{
str(True): "set_current_prompt",
str(False): END,
}
)
parent_graph_builder.add_edge("set_current_prompt", "child_graph")
parent_graph_builder.add_edge("child_graph", "increment_prompt_index")
parent_graph_builder.add_edge("increment_prompt_index", "placeholder") # would route back to START, but that is illegal lol
parent_graph = parent_graph_builder.compile(checkpointer=MemorySaver()) Output: {'placeholder': None}
{'set_current_prompt': {'prompt': 'a'}}
{'__interrupt__': (Interrupt(value='a', resumable=True, ns=['child_graph', 'get_human_input:b2898a55-4798-14b5-3273-ddbae1e75171'], when='during'),)}
Input for prompt "a": 1
{'child_graph': {'prompt': 'a', 'human_inputs': ['1']}}
{'increment_prompt_index': {'prompt_index': 1}}
{'placeholder': None}
{'set_current_prompt': {'prompt': 'b'}} // <-- updated `prompt` in state to next in sequence
{'child_graph': {'prompt': 'a', 'human_inputs': ['1']}} // <-- that this still references prompt `a`
{'increment_prompt_index': {'prompt_index': 2}}
{'placeholder': None} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hey guys,
I am trying to combine the Human in the Loop with Parellel Execution of a subgraph. My use case is that within each subgraph, there is a a human review stage.
I originally expected that I would be able to call interrupt from within a subgraph. However, it seems that interrupt call from one worker will interrupt all of the other workers. Then, upon resume, all of the workers pickup the same user input.
See the minimal example below (sorry I could not get the entire graph to show as child graph as a single image... just appears as a node...)
Parent Graph:
Child Graph:
Minimal Implementation
So I expect to see something like this:
However, the actual output is this:
Current workaround
In the meantime, I can work around the issue by not using concurrency, and instead just use a synchronous workflow to capture input for each element. But, that obviously comes with its performance limitations.
Furthermore, since my application is just CLI, I can call
input(...)
ortyper.prompt(...)
directly in the node, but that is just temporary as I don't want to couple the application logic with how it is interfaced.Beta Was this translation helpful? Give feedback.
All reactions