Skip to content

Commit b39cbab

Browse files
committed
feat: Add GraphAgent for directed-graph workflow orchestration
Add GraphAgent for building directed-graph workflows with conditional routing, cyclic execution, state management with reducers, typed events, streaming, callbacks, rewind, resumability, telemetry with OpenTelemetry tracing, evaluation metrics, and CLI graph visualization for GraphAgent topologies. Includes samples and design documentation.
1 parent 223d9a7 commit b39cbab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+16306
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# GraphAgent Basic Example — Conditional Routing
2+
3+
This example demonstrates a data validation pipeline using **conditional routing** based on runtime
4+
state. The validator checks input quality and branches to either a processor (success path) or an
5+
error handler (failure path), showing how GraphAgent enables state-dependent decision logic that
6+
sequential or parallel agent composition alone cannot achieve.
7+
8+
## When to Use This Pattern
9+
10+
- Any workflow requiring "if X then A, else B" branching on agent output
11+
- Input validation before expensive downstream processing
12+
- Quality-gate patterns where the next step depends on a score or classification
13+
14+
## How to Run
15+
16+
```bash
17+
adk run contributing/samples/graph_agent_basic
18+
```
19+
20+
## Graph Structure
21+
22+
```
23+
validate ──(valid=True)──▶ process
24+
──(valid=False)─▶ error
25+
```
26+
27+
## Key Code Walkthrough
28+
29+
- **`GraphNode(name="validate", agent=validator_agent)`** — wraps an `LlmAgent` as a graph node
30+
- **`add_edge("validate", "process", condition=lambda s: s.data["valid"] == True)`** — conditional
31+
edge that only fires when the validation flag is set
32+
- **Two end nodes** (`process` and `error`) — GraphAgent can have multiple terminal nodes
33+
- **State propagation** — each node's output is written to `state.data[node_name]` and read by
34+
downstream condition functions
35+
- **No cycles** — this is a simple directed acyclic graph; for loops see `graph_agent_dynamic_queue`
36+

contributing/samples/graph_agent_basic/__init__.py

Whitespace-only changes.
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""Basic GraphAgent example demonstrating conditional routing.
2+
3+
This example shows how GraphAgent enables conditional workflow routing
4+
based on runtime state, which cannot be achieved with SequentialAgent
5+
or ParallelAgent composition.
6+
7+
Use case: Data validation pipeline with retry logic.
8+
- If validation passes -> process data
9+
- If validation fails -> retry validation
10+
- After max retries -> route to error handler
11+
"""
12+
13+
import asyncio
14+
import os
15+
16+
from google.adk.agents import GraphAgent
17+
from google.adk.agents import LlmAgent
18+
from google.adk.runners import Runner
19+
from google.adk.sessions import InMemorySessionService
20+
from google.genai import types
21+
from pydantic import BaseModel
22+
23+
_MODEL = os.getenv("LLM_MODEL_NAME", "gemini-2.5-flash")
24+
25+
26+
# --- Validation Result Schema ---
27+
class ValidationResult(BaseModel):
28+
"""Validation result structure."""
29+
30+
valid: bool
31+
error: str | None = None
32+
33+
34+
# --- Validator Agent ---
35+
validator = LlmAgent(
36+
name="validator",
37+
model=_MODEL,
38+
instruction="""
39+
You validate input data quality.
40+
Check if the input contains valid JSON.
41+
Return {"valid": true} if valid, {"valid": false, "error": "reason"} if invalid.
42+
""",
43+
output_schema=ValidationResult, # Ensures structured JSON output
44+
# output_key auto-defaults to "validator" (agent name)
45+
)
46+
47+
# --- Processor Agent ---
48+
processor = LlmAgent(
49+
name="processor",
50+
model=_MODEL,
51+
instruction="""
52+
You process validated data.
53+
Transform the input JSON and return processed results.
54+
""",
55+
)
56+
57+
# --- Error Handler Agent ---
58+
error_handler = LlmAgent(
59+
name="error_handler",
60+
model=_MODEL,
61+
instruction="""
62+
You handle validation errors.
63+
Provide helpful error messages and suggestions for fixing invalid data.
64+
""",
65+
)
66+
67+
68+
# --- Edge Condition Functions ---
69+
def is_valid_json(state) -> bool:
70+
"""Check if JSON is valid from structured output."""
71+
# Convention: Access via agent.name (auto-defaulted as output_key)
72+
result = state.data.get("validator", {})
73+
return result.get("valid", False) is True
74+
75+
76+
# --- Create GraphAgent with Conditional Routing ---
77+
def build_validation_graph() -> GraphAgent:
78+
"""Build the validation pipeline graph."""
79+
g = GraphAgent(name="validation_pipeline")
80+
81+
# Add nodes
82+
g.add_node("validate", agent=validator)
83+
g.add_node("process", agent=processor)
84+
g.add_node("error", agent=error_handler)
85+
86+
# Add conditional edges
87+
# If validation passes (state.data["validator"]["valid"] == True) -> process
88+
g.add_edge(
89+
"validate",
90+
"process",
91+
condition=is_valid_json,
92+
)
93+
94+
# If validation fails (state.data["validator"]["valid"] == False) -> error handler
95+
g.add_edge(
96+
"validate",
97+
"error",
98+
condition=lambda state: not is_valid_json(state),
99+
)
100+
101+
# Define workflow
102+
g.set_start("validate")
103+
g.set_end("process") # Success path ends at process
104+
g.set_end("error") # Error path ends at error handler
105+
106+
return g
107+
108+
109+
# --- Run the workflow ---
110+
111+
112+
async def main():
113+
graph = build_validation_graph()
114+
runner = Runner(
115+
app_name="validation_pipeline",
116+
agent=graph,
117+
session_service=InMemorySessionService(),
118+
auto_create_session=True,
119+
)
120+
121+
# Example: Valid input
122+
print("=== Testing with valid JSON ===")
123+
async for event in runner.run_async(
124+
user_id="user_1",
125+
session_id="session_1",
126+
new_message=types.Content(
127+
role="user",
128+
parts=[types.Part(text='{"name": "John", "age": 30}')],
129+
),
130+
):
131+
if event.content and event.content.parts:
132+
print(f"{event.author}: {event.content.parts[0].text}")
133+
134+
# Example: Invalid input
135+
print("\n=== Testing with invalid JSON ===")
136+
async for event in runner.run_async(
137+
user_id="user_1",
138+
session_id="session_2",
139+
new_message=types.Content(
140+
role="user",
141+
parts=[types.Part(text='{"name": "Invalid data')],
142+
),
143+
):
144+
if event.content and event.content.parts:
145+
print(f"{event.author}: {event.content.parts[0].text}")
146+
147+
148+
if __name__ == "__main__":
149+
asyncio.run(main())
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# yaml-language-server: $schema=https://raw.githubusercontent.com/google/adk-python/refs/heads/main/src/google/adk/agents/config_schemas/AgentConfig.json
2+
3+
agent_class: GraphAgent
4+
name: validation_pipeline
5+
description: Data validation pipeline with conditional routing
6+
7+
# Define start and end nodes
8+
start_node: validate
9+
end_nodes:
10+
- process
11+
- error
12+
13+
# Maximum iterations for cyclic graphs (default: 20)
14+
max_iterations: 10
15+
16+
# Node definitions
17+
nodes:
18+
- name: validate
19+
sub_agents:
20+
- code: agents.validator
21+
22+
- name: process
23+
sub_agents:
24+
- code: agents.processor
25+
26+
- name: error
27+
sub_agents:
28+
- code: agents.error_handler
29+
30+
# Edge definitions with conditional routing
31+
edges:
32+
# If validation passes -> process
33+
- source_node: validate
34+
target_node: process
35+
condition: "data.get('valid', False) is True"
36+
priority: 1
37+
38+
# If validation fails -> error handler
39+
- source_node: validate
40+
target_node: error
41+
condition: "data.get('valid', False) is False"
42+
priority: 1
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# GraphAgent Dynamic Task Queue Example
2+
3+
This example demonstrates the **Dynamic Task Queue** pattern for GraphAgent, enabling AI Co-Scientist and similar workflows where tasks are generated and processed dynamically at runtime.
4+
5+
## Pattern Overview
6+
7+
The dynamic task queue pattern uses a function node with runtime agent dispatch:
8+
- **Task Queue**: Maintained in GraphState, grows/shrinks dynamically
9+
- **Agent Dispatch**: Different agents selected based on task type
10+
- **Dynamic Task Generation**: Agents generate new tasks from their outputs
11+
- **State-Based Loop**: Continues until queue is empty
12+
13+
## What This Example Shows
14+
15+
1. **Mock Agents**: Three agents (generation, review, experiment) for demonstration
16+
2. **Task Parsing**: Extract TODO items from agent outputs to create new tasks
17+
3. **Dynamic Dispatch**: Select agent based on task type at runtime
18+
4. **Queue Management**: Process tasks until queue is empty
19+
20+
## Architecture Support
21+
22+
This pattern enables **95%+ architecture support** for:
23+
- AI Co-Scientist (dynamic hypothesis generation and testing)
24+
- Research paper writing (dynamic outline → research → writing loops)
25+
- Multi-agent task orchestration
26+
27+
## Running the Example
28+
29+
```bash
30+
cd /path/to/adk-python
31+
source venv/bin/activate
32+
python contributing/samples/graph_agent_dynamic_queue/agent.py
33+
```
34+
35+
The example will:
36+
1. Start with 2 initial tasks (generate hypothesis 1 and 2)
37+
2. Process each task with appropriate agent
38+
3. Parse agent outputs for new tasks (TODO: review X, TODO: experiment Y)
39+
4. Add new tasks to queue dynamically
40+
5. Continue until queue is empty
41+
42+
## Adapting This Pattern
43+
44+
Replace the mock agents with real agents:
45+
```python
46+
from your_agents import GenerationAgent, ReviewAgent, ExperimentAgent
47+
48+
generation_agent = GenerationAgent(name="generation")
49+
review_agent = ReviewAgent(name="review")
50+
experiment_agent = ExperimentAgent(name="experiment")
51+
```
52+
53+
Customize task parsing logic in `parse_new_tasks_from_result()` to match your agent outputs.

0 commit comments

Comments
 (0)