Skip to content

Commit 7302880

Browse files
committed
feat: Add graph pattern nodes for dynamic dispatch and composition
Add DynamicNode (runtime agent selection), NestedGraphNode (hierarchical workflow composition), and DynamicParallelGroup (variable-count concurrent execution). Extends CLI visualization with pattern-aware rendering (diamond, parallelogram, sub-cluster shapes). Includes pattern samples, node type reference, and design documentation.
1 parent b64a3dd commit 7302880

File tree

21 files changed

+4630
-2
lines changed

21 files changed

+4630
-2
lines changed

contributing/docs/advanced_graph_patterns.md

Lines changed: 623 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# GraphAgent Node Types
2+
3+
A GraphAgent is made of **nodes** connected by **edges**. Each node
4+
represents one unit of work. This document explains every node type, when
5+
to use each one, and why the pattern-API nodes exist alongside the older
6+
function-node approach.
7+
8+
---
9+
10+
## GraphNode — the fundamental building block
11+
12+
`GraphNode` is the only node type you strictly need. Everything else is
13+
built on top of it.
14+
15+
A `GraphNode` wraps either an **agent** (any ADK `BaseAgent` subclass) or a
16+
**function** (any `async def` that takes `state` and `ctx`).
17+
18+
```python
19+
# Pattern 1 (Explicit): Wrapping an agent
20+
graph.add_node(GraphNode(name="summarise", agent=summariser_agent))
21+
22+
# Pattern 2 (Convenience): Wrapping an agent
23+
graph.add_node("summarise", agent=summariser_agent)
24+
25+
# Wrapping a function
26+
async def clean(state, ctx):
27+
return state.data.get("raw", "").strip()
28+
29+
# Pattern 1 (Explicit)
30+
graph.add_node(GraphNode(name="clean", function=clean))
31+
32+
# Pattern 2 (Convenience)
33+
graph.add_node("clean", function=clean)
34+
```
35+
36+
**What it handles for you**
37+
- Feeds the right part of state to the agent as input (`input_mapper`)
38+
- Stores the agent's output back into state under the node name (`output_mapper`)
39+
- Supports four merge strategies: overwrite, append, sum, or custom
40+
41+
**When to use it**: always — for individual agents or short custom logic.
42+
43+
---
44+
45+
## ParallelNodeGroup — static concurrency
46+
47+
`ParallelNodeGroup` runs a **fixed set of nodes at the same time**. The
48+
nodes to run and their names are declared when you build the graph.
49+
50+
```python
51+
from google.adk.agents.graph import ParallelNodeGroup, JoinStrategy
52+
53+
graph.add_group(ParallelNodeGroup(
54+
name="fan_out",
55+
nodes=["search_web", "search_db", "search_docs"],
56+
join_strategy=JoinStrategy.WAIT_ALL,
57+
))
58+
```
59+
60+
**Three join strategies**
61+
62+
| Strategy | Meaning |
63+
|---|---|
64+
| `WAIT_ALL` | Wait for every node to finish before continuing |
65+
| `WAIT_ANY` | Continue as soon as the first node finishes |
66+
| `WAIT_N` | Continue after N nodes finish (you pick N) |
67+
68+
**When to use it**: you know the exact set of agents to run in parallel at
69+
the time you write the code, and that set never changes at runtime.
70+
71+
---
72+
73+
## DynamicNode — choose the agent at runtime
74+
75+
`DynamicNode` is a `GraphNode` that decides **which agent to call** by
76+
running a selector function against the current graph state.
77+
78+
```python
79+
def pick_agent(state):
80+
if state.data.get("task_type") == "complex":
81+
return thorough_agent
82+
return quick_agent
83+
84+
# Pattern 1 (Explicit)
85+
graph.add_node(DynamicNode(
86+
name="respond",
87+
agent_selector=pick_agent,
88+
fallback_agent=quick_agent, # used if selector returns None
89+
))
90+
91+
# Pattern 2 (Convenience)
92+
graph.add_node("respond", agent_selector=pick_agent, fallback_agent=quick_agent)
93+
```
94+
95+
**Do you need it?** Not strictly. You can achieve the same result with a
96+
plain `GraphNode(function=...)`:
97+
98+
```python
99+
async def dispatch(state, ctx):
100+
agent = thorough_agent if state.data.get("task_type") == "complex" else quick_agent
101+
node_ctx = ctx.model_copy(update={"user_content": ...})
102+
output = ""
103+
async for event in agent.run_async(node_ctx):
104+
if event.content and event.content.parts:
105+
output = event.content.parts[0].text or ""
106+
return output
107+
```
108+
109+
**Why DynamicNode exists**
110+
111+
1. It removes ten lines of boilerplate that you'd copy-paste every time you
112+
add a new routing node.
113+
2. It encapsulates agent selection logic with built-in fallback support.
114+
3. It has a built-in `fallback_agent` parameter — with a function node you
115+
must remember to add that guard yourself.
116+
117+
**When to use it**: any time you want to route to different agents based on
118+
what's in the state, especially if you have more than one such node or want
119+
the selection to show up in traces.
120+
121+
---
122+
123+
## NestedGraphNode — run a whole sub-graph as one step
124+
125+
`NestedGraphNode` runs a complete `GraphAgent` as a single step inside a
126+
parent graph. The parent sees only the sub-graph's final answer.
127+
128+
```python
129+
# Pattern 1 (Explicit)
130+
graph.add_node(NestedGraphNode(
131+
name="research",
132+
graph_agent=research_pipeline, # a full GraphAgent with its own nodes
133+
inherit_session=True, # True = share state; False = clean slate
134+
))
135+
136+
# Pattern 2 (Convenience)
137+
graph.add_node("research", graph_agent=research_pipeline, inherit_session=True)
138+
```
139+
140+
**Do you need it?** No. A function node can call a sub-graph directly:
141+
142+
```python
143+
async def run_research(state, ctx):
144+
sub_ctx = ctx.model_copy(update={"user_content": ...})
145+
result = ""
146+
async for event in research_graph._run_async_impl(sub_ctx):
147+
if event.content and event.content.parts:
148+
text = event.content.parts[0].text or ""
149+
if text and not text.startswith("[GraphMetadata]"): # must filter manually
150+
result = text
151+
return result
152+
```
153+
154+
The function-node version **works**, and it worked before `NestedGraphNode`
155+
was added. `NestedGraphNode` is purely a convenience that:
156+
157+
1. Does the event-filtering for you (the inner graph emits empty sentinel
158+
events at the end that would otherwise overwrite your result — a subtle
159+
bug that's easy to introduce yourself).
160+
2. Gives you `inherit_session` as a first-class parameter instead of you
161+
having to remember whether to copy the context or create a new session.
162+
3. Records the inner graph's iteration count and execution path in metadata
163+
automatically.
164+
165+
**When to use it**: when you want to embed a reusable multi-step sub-workflow
166+
as a single node and don't want to write the filtering and context-management
167+
boilerplate by hand. If you've already written a working function-node
168+
version, there is no reason to switch.
169+
170+
---
171+
172+
## DynamicParallelGroup — variable-count concurrency
173+
174+
`DynamicParallelGroup` is like `ParallelNodeGroup` except the **number of
175+
agents is decided at runtime** by reading the graph state.
176+
177+
```python
178+
def create_agents(state):
179+
n = int(state.data.get("num_branches", "3"))
180+
return [ThoughtAgent(f"branch_{i}") for i in range(n)]
181+
182+
def combine(results, state):
183+
return "\n---\n".join(results)
184+
185+
# Pattern 1 (Explicit)
186+
graph.add_node(DynamicParallelGroup(
187+
name="explore",
188+
agent_generator=create_agents,
189+
aggregator=combine,
190+
max_parallelism=5, # never run more than 5 at once
191+
))
192+
193+
# Pattern 2 (Convenience)
194+
graph.add_node("explore", agent_generator=create_agents, aggregator=combine, max_parallelism=5)
195+
```
196+
197+
**How it differs from ParallelNodeGroup**
198+
199+
| | `ParallelNodeGroup` | `DynamicParallelGroup` |
200+
|---|---|---|
201+
| Number of agents | Fixed at build time | Computed from state at runtime |
202+
| Custom aggregation | Not built-in | Required (`aggregator` param) |
203+
| Concurrency cap | None | `max_parallelism` |
204+
205+
**Do you need it?** If `N` is always fixed (say, always 3 branches), use
206+
`ParallelNodeGroup`. If `N` changes per request — e.g. "run as many
207+
reviewers as there are items in the queue" — `DynamicParallelGroup` is the
208+
right tool and there is no equally clean alternative with existing APIs.
209+
210+
**When to use it**: Tree of Thoughts, self-consistency sampling, fan-out
211+
over a variable-length list of items, or any other case where the number of
212+
parallel workers is not known when you write the code.
213+
214+
---
215+
216+
## Summary
217+
218+
| Node type | Fixed at build time? | Needs agent upfront? | Custom aggregation? | Use when… |
219+
|---|---|---|---|---|
220+
| `GraphNode` || yes or function || Single agent or custom logic |
221+
| `ParallelNodeGroup` | yes | yes | no | Known fixed set of parallel agents |
222+
| `DynamicNode` || no (picked at runtime) || Different agents for different inputs |
223+
| `NestedGraphNode` || no (a full sub-graph) || Reusable sub-workflow as one step |
224+
| `DynamicParallelGroup` || no (generated at runtime) | yes | Variable number of parallel agents |
225+
226+
---
227+
228+
## Coverage (feature branch)
229+
230+
Measured with `pytest --cov` across 1197 tests:
231+
232+
| Module | Coverage |
233+
|---|---|
234+
| `graph/patterns.py` | 100% |
235+
| `graph/__init__.py` | 100% |
236+
| `graph/callbacks.py` | 100% |
237+
| `graph/graph_agent_config.py` | 100% |
238+
| `graph/graph_state.py` | 100% |
239+
| `graph/graph_tracing.py` | 100% |
240+
| `graph/graph_node.py` | 97% |
241+
| `graph/interrupt_service.py` | 97% |
242+
| `graph/parallel.py` | 97% |
243+
| `graph/interrupt.py` | 95% |
244+
| `checkpoints/checkpoint_service.py` | 98% |
245+
| `graph/graph_agent.py` | 71% |
246+
| **Branch total** | **89%** |
247+
248+
`graph_agent.py` pulls the average down; the uncovered lines are primarily
249+
in the OTel baggage propagation paths and some telemetry-sampling branches
250+
that require a live tracing exporter to exercise.

0 commit comments

Comments
 (0)