Skip to content

Commit 8224d46

Browse files
committed
feat: Add graph pattern nodes for dynamic dispatch and composition
Add DynamicNode (runtime agent selection), NestedGraphNode (hierarchical workflow composition), and DynamicParallelGroup (variable-count concurrent execution). Extends CLI visualization with pattern-aware rendering (diamond, parallelogram, sub-cluster shapes). Includes pattern samples, node type reference, and design documentation.
1 parent 1414a38 commit 8224d46

File tree

17 files changed

+1978
-32
lines changed

17 files changed

+1978
-32
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# GraphAgent Pattern — DynamicNode (Mixture of Experts)
2+
3+
This example implements **runtime agent selection** using `DynamicNode`. A classifier labels the
4+
incoming task as SIMPLE or COMPLEX, then `DynamicNode` routes to a cheap fast model or a thorough
5+
capable model accordingly — a sparse mixture-of-experts dispatch optimizing cost vs. quality.
6+
7+
## When to Use This Pattern
8+
9+
- Cost optimisation: route easy tasks to cheaper models, hard tasks to capable models
10+
- Capability dispatch: pick a specialist agent based on detected task domain
11+
- Fallback chains: try a fast agent first, escalate to a powerful agent on failure
12+
13+
## How to Run
14+
15+
```bash
16+
adk run contributing/samples/graph_agent_pattern_dynamic_node
17+
```
18+
19+
## Graph Structure
20+
21+
```
22+
classify ──▶ respond (DynamicNode)
23+
├── simple_agent (when classify output contains "SIMPLE")
24+
└── detailed_agent (otherwise)
25+
```
26+
27+
## Key Code Walkthrough
28+
29+
- **`DynamicNode(name="respond", agent_selector=select_responder)`** — the selector callable
30+
receives `GraphState` and returns the `BaseAgent` to invoke
31+
- **`select_responder(state)`** — reads `state.data["classify"]` and returns the matching agent
32+
- **`fallback_agent`** parameter — used when the selector returns `None`
33+
- **Transparent to the graph** — downstream edges see `respond`'s output regardless of which
34+
agent was chosen
35+
- **No graph-level changes needed** — swap agents by changing `select_responder`, not the graph
36+
topology
37+

contributing/samples/graph_agent_pattern_dynamic_node/__init__.py

Whitespace-only changes.
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
#!/usr/bin/env python3
2+
"""DynamicNode Pattern: Runtime Agent Selection
3+
4+
Motivation (Mixture-of-Experts)
5+
--------------------------------
6+
Shazeer et al. (2017) "Outrageously Large Neural Networks: The Sparsely-Gated
7+
Mixture-of-Experts Layer" showed that routing inputs to specialised experts
8+
beats a single monolithic model while keeping per-token compute fixed.
9+
10+
The same principle applies to agentic workflows: a *router* classifies the
11+
complexity/type of each task, then a *gating function* selects the cheapest
12+
adequate specialist—a fast flash-model for simple tasks, a slower pro-model
13+
for hard tasks.
14+
15+
Pattern: DynamicNode
16+
--------------------
17+
DynamicNode is the first-class API for this pattern. The `agent_selector`
18+
callable runs at runtime, reads the current GraphState, and returns the
19+
appropriate BaseAgent.
20+
21+
Compare to the function-node alternative
22+
-----------------------------------------
23+
Without DynamicNode you need a function node that manually dispatches:
24+
25+
async def dispatch(state, ctx):
26+
agent = complex_agent if "hard" in state.data else simple_agent
27+
node_ctx = ctx.model_copy(update={...})
28+
output = ""
29+
async for event in agent.run_async(node_ctx):
30+
if event.content and event.content.parts:
31+
output = event.content.parts[0].text or ""
32+
return output
33+
34+
DynamicNode gives you:
35+
✅ Metadata auto-tracking: which agent was selected (observability)
36+
✅ Built-in fallback_agent when selector returns None
37+
✅ Selection logic decoupled from execution boilerplate
38+
39+
Architecture
40+
------------
41+
classify ──► route (DynamicNode) ──► end
42+
43+
├─ selector returns simple_agent (flash, cheap)
44+
└─ selector returns detailed_agent (pro, thorough)
45+
"""
46+
47+
import asyncio
48+
import os
49+
50+
from google.adk.agents import LlmAgent
51+
from google.adk.agents.graph import DynamicNode
52+
from google.adk.agents.graph import GraphAgent
53+
from google.adk.agents.graph import GraphState
54+
from google.adk.runners import Runner
55+
from google.adk.sessions import InMemorySessionService
56+
from google.genai import types
57+
58+
_MODEL = os.getenv("LLM_MODEL_NAME", "gemini-2.5-flash")
59+
60+
# ---------------------------------------------------------------------------
61+
# Step 1: Classifier — assigns complexity label from the user's request
62+
# ---------------------------------------------------------------------------
63+
classifier = LlmAgent(
64+
name="classifier",
65+
model=_MODEL,
66+
instruction="""
67+
You are a task complexity classifier.
68+
69+
Read the user's request and reply with EXACTLY one word:
70+
SIMPLE – if the task is a quick factual lookup or short question
71+
COMPLEX – if the task requires multi-step reasoning, analysis, or code
72+
73+
Reply with only the word, nothing else.
74+
""",
75+
)
76+
77+
# ---------------------------------------------------------------------------
78+
# Step 2: Specialists — cheap flash model vs thorough pro model
79+
# ---------------------------------------------------------------------------
80+
simple_agent = LlmAgent(
81+
name="simple_responder",
82+
model=_MODEL,
83+
instruction="""
84+
You are a concise assistant. Answer the user's question briefly (1-3 sentences).
85+
""",
86+
)
87+
88+
detailed_agent = LlmAgent(
89+
name="detailed_responder",
90+
model=_MODEL,
91+
instruction="""
92+
You are a thorough analyst. Work through the problem step by step, show your
93+
reasoning, and provide a complete, well-structured answer.
94+
""",
95+
)
96+
97+
98+
# ---------------------------------------------------------------------------
99+
# Step 3: Agent selector — called at runtime with current GraphState
100+
# ---------------------------------------------------------------------------
101+
def select_responder(state: GraphState) -> LlmAgent:
102+
"""Route to simple_agent for SIMPLE tasks, detailed_agent otherwise.
103+
104+
The classifier stored its output in state.data["classify"] via the
105+
default output_mapper (OVERWRITE reducer, key = node name).
106+
"""
107+
classification = state.data.get("classify", "").upper()
108+
if "SIMPLE" in classification:
109+
return simple_agent
110+
return detailed_agent
111+
112+
113+
# ---------------------------------------------------------------------------
114+
# Build the graph
115+
# ---------------------------------------------------------------------------
116+
def build_graph() -> GraphAgent:
117+
graph = GraphAgent(
118+
name="dynamic_routing",
119+
description="Routes each query to the cheapest adequate specialist",
120+
)
121+
122+
# Node 1: classify complexity
123+
graph.add_node("classify", agent=classifier)
124+
125+
# Node 2: DynamicNode selects the right specialist at runtime
126+
graph.add_node(
127+
DynamicNode(
128+
name="respond",
129+
agent_selector=select_responder,
130+
fallback_agent=simple_agent, # safety net if selector returns None
131+
)
132+
)
133+
134+
graph.add_edge("classify", "respond")
135+
graph.set_start("classify")
136+
graph.set_end("respond")
137+
return graph
138+
139+
140+
# ---------------------------------------------------------------------------
141+
# Runner helper
142+
# ---------------------------------------------------------------------------
143+
_graph = build_graph()
144+
145+
146+
async def run(question: str) -> str:
147+
graph = _graph
148+
svc = InMemorySessionService()
149+
runner = Runner(
150+
app_name="dynamic_node_example", agent=graph, session_service=svc
151+
)
152+
await svc.create_session(
153+
app_name="dynamic_node_example", user_id="user", session_id="s1"
154+
)
155+
final = ""
156+
async for event in runner.run_async(
157+
user_id="user",
158+
session_id="s1",
159+
new_message=types.Content(role="user", parts=[types.Part(text=question)]),
160+
):
161+
if event.content and event.content.parts:
162+
text = event.content.parts[0].text or ""
163+
if text and not text.startswith("[GraphMetadata]"):
164+
final = text
165+
return final
166+
167+
168+
# ---------------------------------------------------------------------------
169+
# Demo
170+
# ---------------------------------------------------------------------------
171+
172+
173+
async def main():
174+
questions = [
175+
"What is the capital of France?", # SIMPLE → flash model
176+
( # COMPLEX → pro model
177+
"Explain how transformer attention scales with sequence length "
178+
"and what architectural changes help address this."
179+
),
180+
]
181+
for q in questions:
182+
print(f"\nQ: {q}")
183+
answer = await run(q)
184+
print(f"A: {answer}")
185+
186+
187+
if __name__ == "__main__":
188+
asyncio.run(main())
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# GraphAgent Pattern — NestedGraphNode (Hierarchical Composition)
2+
3+
This example demonstrates **hierarchical workflow decomposition** using `NestedGraphNode`. A
4+
coordinator planner produces a focused query, a three-step research sub-graph (search → extract →
5+
summarise) handles it as a single reusable unit, and a synthesiser produces the final answer.
6+
The sub-graph is entirely encapsulated and independently testable.
7+
8+
## When to Use This Pattern
9+
10+
- Large workflows that benefit from breaking into independently developed sub-pipelines
11+
- Reusable sub-workflows (the same research graph could be called from multiple parent graphs)
12+
- Team boundaries: different teams own the outer orchestration and inner sub-workflows
13+
- Recursive depth: sub-graphs can themselves contain `NestedGraphNode`s
14+
15+
## How to Run
16+
17+
```bash
18+
adk run contributing/samples/graph_agent_pattern_nested_graph
19+
```
20+
21+
## Graph Structure
22+
23+
```
24+
Outer: plan ──▶ research (NestedGraphNode) ──▶ synthesise
25+
26+
Inner (research sub-graph):
27+
search ──▶ extract ──▶ summarise
28+
```
29+
30+
## Key Code Walkthrough
31+
32+
- **`NestedGraphNode(name="research", graph_agent=build_research_subgraph())`** — wraps an entire
33+
`GraphAgent` as a single node in the parent graph
34+
- **`inherit_session=True`** — the sub-graph shares the parent session's state, so outputs
35+
written inside are visible to the parent's synthesiser
36+
- **`build_research_subgraph()`** — factory function that constructs and returns the inner
37+
`GraphAgent`; call it multiple times for independent instances
38+
- **State bridging** — the sub-graph's final state is merged back; use `output_mapper` on the
39+
`NestedGraphNode` to control which keys are exposed to the outer graph
40+
- **Telemetry and checkpointing** — propagate automatically into the sub-graph when enabled on
41+
the parent
42+

contributing/samples/graph_agent_pattern_nested_graph/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)