Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Streaming Tools Non-Live Agent

This agent demonstrates streaming tools in non-live mode (run_async/SSE).

## Features

- **monitor_stock_price**: Monitors stock prices with real-time updates
- **process_large_dataset**: Processes datasets with progress updates
- **monitor_system_health**: Monitors system health metrics continuously

## Testing

### With ADK Web UI

```bash
cd contributing/samples
adk web .
```

Then try:
- "Monitor the stock price for AAPL"
- "Process a large dataset at /tmp/data.csv"
- "Monitor system health"

### With ADK CLI

```bash
cd contributing/samples/streaming_tools_non_live_agent
adk run .
```

### With API Server (SSE)

```bash
cd contributing/samples
adk api_server .
```

Then send a POST request to `/run_sse` with `streaming: true` to see intermediate Events.
15 changes: 15 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import agent
128 changes: 128 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example agent demonstrating streaming tools in non-live mode (run_async/SSE).

This agent shows how to use streaming tools that yield intermediate results
in non-live mode. Streaming tools work with both run_async and SSE endpoints.
"""

from __future__ import annotations

import asyncio
from typing import AsyncGenerator

from google.adk.agents import Agent


async def monitor_stock_price(symbol: str) -> AsyncGenerator[dict, None]:
"""Monitor stock price with real-time updates.

This is a streaming tool that yields intermediate results as the stock
price changes. The agent can react to these intermediate results.

Args:
symbol: The stock symbol to monitor (e.g., 'AAPL', 'GOOGL').

Yields:
Dictionary containing stock price updates with status indicators.
"""
# Simulate stock price changes
prices = [100, 105, 110, 108, 112, 115]
for i, price in enumerate(prices):
await asyncio.sleep(1) # Simulate real-time updates
yield {
'symbol': symbol,
'price': price,
'update': i + 1,
'status': 'streaming' if i < len(prices) - 1 else 'complete',
}


async def process_large_dataset(file_path: str) -> AsyncGenerator[dict, None]:
"""Process dataset with progress updates.

This streaming tool demonstrates how to provide progress feedback
for long-running operations.

Args:
file_path: Path to the dataset file to process.

Yields:
Dictionary containing progress information and final result.
"""
total_rows = 100
processed = 0

# Simulate processing in batches
for batch in range(10):
await asyncio.sleep(0.5) # Simulate processing time
processed += 10
yield {
'progress': processed / total_rows,
'processed': processed,
'total': total_rows,
'status': 'streaming',
'message': f'Processed {processed}/{total_rows} rows',
}

# Final result
yield {
'result': 'Processing complete',
'status': 'complete',
'file_path': file_path,
'total_processed': total_rows,
}


async def monitor_system_health() -> AsyncGenerator[dict, None]:
"""Monitor system health metrics with continuous updates.

This streaming tool demonstrates continuous monitoring that can be
stopped by the agent when thresholds are reached.

Yields:
Dictionary containing system health metrics.
"""
metrics = [
{'cpu': 45, 'memory': 60, 'disk': 70},
{'cpu': 50, 'memory': 65, 'disk': 72},
{'cpu': 55, 'memory': 70, 'disk': 75},
{'cpu': 60, 'memory': 75, 'disk': 78},
]

for i, metric in enumerate(metrics):
await asyncio.sleep(2) # Check every 2 seconds
yield {
'metrics': metric,
'timestamp': i + 1,
'status': 'streaming' if i < len(metrics) - 1 else 'complete',
'alert': 'high' if metric['cpu'] > 55 else 'normal',
}


root_agent = Agent(
name='streaming_tools_agent',
model='gemini-2.5-flash-lite',
instruction=(
'You are a helpful assistant that can monitor stock prices, process'
' datasets, and monitor system health using streaming tools. When'
' using streaming tools, you will receive intermediate results that'
' you can react to. For example, if monitoring stock prices, you can'
' alert the user when prices change significantly. If processing a'
' dataset, you can provide progress updates. If monitoring system'
' health, you can alert when metrics exceed thresholds.'
),
tools=[monitor_stock_price, process_large_dataset, monitor_system_health],
)
131 changes: 98 additions & 33 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,49 +672,114 @@ async def _postprocess_run_processors_async(
async for event in agen:
yield event

async def _postprocess_handle_function_calls_async(
async def _yield_function_response_events(
self,
invocation_context: InvocationContext,
function_call_event: Event,
llm_request: LlmRequest,
function_response_event: Event,
) -> AsyncGenerator[Event, None]:
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
"""Yields auth, confirmation, and set_model_response events for a function response.

Args:
invocation_context: The invocation context.
function_call_event: The original function call event.
function_response_event: The function response event.

Yields:
Auth events, confirmation events, the function response event, and
set_model_response events if applicable.
"""
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

yield function_response_event

# Check if this is a set_model_response function response
if json_response := (
_output_schema_processor.get_structured_model_response(
function_response_event
)
):
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
final_event = _output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
if auth_event:
yield auth_event
yield final_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event
async def _postprocess_handle_function_calls_async(
self,
invocation_context: InvocationContext,
function_call_event: Event,
llm_request: LlmRequest,
) -> AsyncGenerator[Event, None]:
function_calls = function_call_event.get_function_calls()
if not function_calls:
return

# Always yield the function response event first
yield function_response_event
# Check if any tools are streaming tools
has_streaming_tools = any(
functions._is_streaming_tool(tool)
for call in function_calls
if (tool := llm_request.tools_dict.get(call.name))
)

# Check if this is a set_model_response function response
if json_response := _output_schema_processor.get_structured_model_response(
function_response_event
if has_streaming_tools:
# Use streaming handler
tool_confirmation_dict = getattr(
invocation_context, 'tool_confirmation_dict', None
)
async for event in functions.handle_function_calls_async_with_streaming(
invocation_context,
function_calls,
llm_request.tools_dict,
tool_confirmation_dict,
):
# Create and yield a final model response event
final_event = (
_output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
)
yield final_event
transfer_to_agent = function_response_event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(agent_to_run.run_async(invocation_context)) as agen:
async for event in agen:
yield event
async for secondary_event in self._yield_function_response_events(
invocation_context, function_call_event, event
):
yield secondary_event

# Check for agent transfer after each streaming event
transfer_to_agent = event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(
agent_to_run.run_async(invocation_context)
) as agen:
async for transfer_event in agen:
yield transfer_event
# Agent transfer handled, exit the streaming loop
return
else:
# Use regular handler
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
):
async for secondary_event in self._yield_function_response_events(
invocation_context, function_call_event, function_response_event
):
yield secondary_event

transfer_to_agent = function_response_event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(
agent_to_run.run_async(invocation_context)
) as agen:
async for event in agen:
yield event

def _get_agent_to_run(
self, invocation_context: InvocationContext, agent_name: str
Expand Down
Loading
Loading