-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Feat/streaming tools non live #3848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Implements streaming tools support for non-live mode (run_async/SSE), allowing tools to yield intermediate results as Events. Changes: - Added _is_streaming_tool() helper to detect async generator functions - Added handle_function_calls_async_with_streaming() to handle streaming tools - Added _execute_streaming_tool_async() to execute and yield Events for streaming tools - Integrated streaming tool detection in _postprocess_handle_function_calls_async() - Added cancellation support via task tracking in active_streaming_tools - Added unit tests Fixes google#3837
Adds a sample agent demonstrating streaming tools in non-live mode
Summary of ChangesHello @sarojrout, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a limitation where streaming tools (asynchronous generator functions) were only functional in 'live' execution modes. Previously, using 'run_async()' or SSE endpoints with such tools would block the entire agent response until the tool completed, leading to poor user experience and hindering real-time feedback. The solution extends the streaming tools infrastructure to support progressive result delivery in these non-live modes, allowing agents to receive and react to intermediate outputs as they are generated, significantly improving responsiveness and enabling new use cases like real-time monitoring and progress updates for long-running operations. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant feature to enable streaming tools in non-live modes, which is a great enhancement for long-running operations. The implementation is well-structured, with clear separation of concerns for handling streaming versus regular tools, and it's backed by a comprehensive set of new unit tests. My review includes a few suggestions to improve maintainability by reducing code duplication and removing unused code. I've also identified a high-severity issue where the agent transfer logic is missing for the new streaming tool path, which could prevent agents from reacting to streamed results as intended. Please address this to ensure the feature is complete.
- Added agent transfer handling for streaming tools * Check transfer_to_agent after each streaming event * Transfer agent and exit streaming loop when detected * Enables agent reaction to intermediate results - Refactored duplicated code into _yield_function_response_events helper * Both streaming and regular paths use the same helper * Improves maintainability - Moved cleanup logic to finally block * Ensures cleanup always happens (3 places → 1 place) - Removed unused imports and variables * Removed unused LlmAgent import from handle_function_calls_async_with_streaming * Removed unused agent variable
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable feature by enabling streaming tools in non-live modes, which significantly improves the user experience for long-running operations. The implementation is well-structured, and I appreciate the inclusion of comprehensive unit tests and a sample agent, which clearly demonstrate the new functionality. My main feedback is to improve the concurrency of tool execution to ensure that all tools, both streaming and regular, run in parallel as expected in an async environment. I've also included a suggestion to enhance one of the new tests.
|
/gemini review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant feature: support for streaming tools in non-live modes (run_async/SSE). This is a great enhancement for user experience with long-running tools, allowing for progressive results instead of blocking. The implementation is robust, leveraging standard asyncio patterns for concurrency with queues and background tasks. The changes are well-structured into helper functions, and the logic for handling both streaming and regular tools concurrently is sound. Cancellation and error handling have been thoughtfully included. The addition of comprehensive unit tests and a sample agent to demonstrate the new functionality is excellent. I have a couple of minor suggestions to improve robustness and test precision.
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py
Outdated
Show resolved
Hide resolved
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-implemented feature to enable streaming tools in non-live modes. The use of concurrent tasks and async queues is robust, and the feature is well-supported by new tests and a sample agent. My review includes a few suggestions to improve exception handling, type hint accuracy, and code style for better long-term maintainability and correctness.
…ecution as per the review comments Fixed exception handling in handle_function_calls_async_with_streaming to prevent silent failures when tools raise exceptions. Changes are: - Stored results from asyncio.gather instead of discarding them - Checked each result for exceptions - Re-raised first exception encountered to signal failure This will ensure exceptions from failed tools are properly propagated instead of being silently swallowed.
… it more idiomatic and cleaner
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces support for streaming tools in non-live modes (run_async/SSE), which is a valuable enhancement for handling long-running operations and providing real-time feedback. The changes involve refactoring the function call handling in base_llm_flow.py to route streaming tools through a new dedicated handler. New helper functions _is_streaming_tool, _execute_streaming_tool_async, and handle_function_calls_async_with_streaming have been added in functions.py to manage the asynchronous execution, event buffering, error handling, and cancellation of streaming tools. A new sample agent demonstrating this functionality and comprehensive unit tests have also been added. The implementation is robust, correctly handles concurrency, and integrates well with existing patterns. No issues of medium, high, or critical severity were found.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and valuable feature: support for streaming tools in non-live modes (run_async/SSE). The implementation is well-structured, leveraging asyncio queues and tasks to handle concurrent execution of both streaming and regular tools effectively. The addition of a sample agent and comprehensive unit tests is also commendable.
My review focuses on two main points. First, a critical issue regarding feature parity between streaming and regular FunctionTools. The current implementation for streaming tools bypasses important logic for argument preprocessing, context injection, and confirmation handling. Second, a medium-severity suggestion to refactor duplicated code for handling agent transfers to improve maintainability.
Addressing these points will ensure that streaming tools are not only functional but also robust, consistent, and maintainable, providing a seamless experience for developers using them.
…n to streaming tools as per the review comments google#3848
|
Hi @sarojrout, Thank you for your contribution! We appreciate you taking the time to submit this pull request. |
|
@ryanaiagent , Thank you for the feedback! LongRunningFunctionTool and streaming tools serve different use cases:
|
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant feature: support for streaming tools in non-live modes (run_async and SSE). The implementation is well-structured, introducing new handlers for streaming tools (handle_function_calls_async_with_streaming) and running them concurrently with regular tools. The use of asyncio.Queue to merge event streams is a solid pattern. The changes also include good refactoring, such as extracting _yield_function_response_events and _handle_agent_transfer in base_llm_flow.py to improve clarity and reduce duplication. The addition of comprehensive unit tests covering various scenarios, including error handling and cancellation, is commendable.
My main feedback is regarding a piece of duplicated code that was introduced as part of the refactoring in FunctionTool. Addressing this will improve the long-term maintainability of the code.
| async def _prepare_args_and_check_confirmation( | ||
| self, *, args: dict[str, Any], tool_context: ToolContext | ||
| ) -> tuple[dict[str, Any], Optional[dict[str, Any]]]: | ||
| """Prepares arguments and checks confirmation for function invocation. | ||
| This method extracts the argument preparation and confirmation logic from | ||
| run_async so it can be reused by streaming tools and other callers. | ||
| Args: | ||
| args: Raw arguments from the LLM tool call. | ||
| tool_context: The tool context. | ||
| Returns: | ||
| A tuple of (prepared_args, error_response). If error_response is not None, | ||
| it indicates that the tool call should not proceed (e.g., missing args or | ||
| confirmation required/rejected). Otherwise, prepared_args contains the | ||
| processed arguments ready for function invocation. | ||
| """ | ||
| # Preprocess arguments (includes Pydantic model conversion) | ||
| args_to_call = self._preprocess_args(args) | ||
|
|
||
| signature = inspect.signature(self.func) | ||
| valid_params = {param for param in signature.parameters} | ||
| if 'tool_context' in valid_params: | ||
| args_to_call['tool_context'] = tool_context | ||
|
|
||
| # Filter args_to_call to only include valid parameters for the function | ||
| args_to_call = {k: v for k, v in args_to_call.items() if k in valid_params} | ||
|
|
||
| # Before invoking the function, we check for if the list of args passed in | ||
| # has all the mandatory arguments or not. | ||
| # If the check fails, then we don't invoke the tool and let the Agent know | ||
| # that there was a missing input parameter. This will basically help | ||
| # the underlying model fix the issue and retry. | ||
| mandatory_args = self._get_mandatory_args() | ||
| missing_mandatory_args = [ | ||
| arg for arg in mandatory_args if arg not in args_to_call | ||
| ] | ||
|
|
||
| if missing_mandatory_args: | ||
| missing_mandatory_args_str = '\n'.join(missing_mandatory_args) | ||
| error_str = f"""Invoking `{self.name}()` failed as the following mandatory input parameters are not present: | ||
| {missing_mandatory_args_str} | ||
| You could retry calling this tool, but it is IMPORTANT for you to provide all the mandatory parameters.""" | ||
| return (args_to_call, {'error': error_str}) | ||
|
|
||
| if isinstance(self._require_confirmation, Callable): | ||
| require_confirmation = await self._invoke_callable( | ||
| self._require_confirmation, args_to_call | ||
| ) | ||
| else: | ||
| require_confirmation = bool(self._require_confirmation) | ||
|
|
||
| if require_confirmation: | ||
| if not tool_context.tool_confirmation: | ||
| args_to_show = args_to_call.copy() | ||
| if 'tool_context' in args_to_show: | ||
| args_to_show.pop('tool_context') | ||
|
|
||
| tool_context.request_confirmation( | ||
| hint=( | ||
| f'Please approve or reject the tool call {self.name}() by' | ||
| ' responding with a FunctionResponse with an expected' | ||
| ' ToolConfirmation payload.' | ||
| ), | ||
| ) | ||
| tool_context.actions.skip_summarization = True | ||
| return ( | ||
| args_to_call, | ||
| { | ||
| 'error': ( | ||
| 'This tool call requires confirmation, please approve or' | ||
| ' reject.' | ||
| ) | ||
| }, | ||
| ) | ||
| elif not tool_context.tool_confirmation.confirmed: | ||
| return (args_to_call, {'error': 'This tool call is rejected.'}) | ||
|
|
||
| return (args_to_call, None) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good refactoring to extract the argument preparation and confirmation logic into a reusable helper method.
However, the run_async method was not updated to use this new helper, and it still contains the same logic, leading to code duplication. To improve maintainability and prevent future inconsistencies, run_async should be refactored to call _prepare_args_and_check_confirmation.
The run_async method could be simplified to something like this:
@override
async def run_async(
self, *, args: dict[str, Any], tool_context: ToolContext
) -> Any:
prepared_args, error_response = await self._prepare_args_and_check_confirmation(
args=args, tool_context=tool_context
)
if error_response is not None:
return error_response
return await self._invoke_callable(self.func, prepared_args)
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
If applicable, please follow the issue templates to provide as much detail as
possible.
Problem:
Currently, streaming tools (async generator functions) only work in
run_live()mode. When usingrun_async()or SSE endpoints, tools must complete execution before results are shown to users or agents. This causes issues for:Example Problem:
Solution:
Extend the streaming tools infrastructure to support progressive result delivery in non-live mode (
run_async/SSE), similar to how it works in live mode but adapted for non-live execution.Key Changes:
_is_streaming_tool()helper to detect async generator functionshandle_function_calls_async_with_streaming()to handle streaming tools separately from regular tools_execute_streaming_tool_async()to execute streaming tools and yield Events for each intermediate result_postprocess_handle_function_calls_async()to automatically route to streaming handler when neededactive_streaming_toolsHow It Works:
yieldfrom the streaming tool generates a separate EventExample:
In
run_async()mode, this now yields multiple Events as prices update, allowing the agent and user to see real-time updates.Testing Plan
Please describe the tests that you ran to verify your changes. This is required
for all PRs that are not small documentation or typo fixes.
Unit Tests:
Please include a summary of passed
pytestresults.Created comprehensive unit tests in
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py:test_is_streaming_tool_detects_async_generator- Verifies detection of async generator functionstest_streaming_tool_yields_multiple_events- Ensures multiple Events are yielded for streaming toolstest_streaming_tool_with_string_results- Tests streaming tools with string resultstest_streaming_tool_tracks_task_for_cancellation- Verifies task tracking for cancellationtest_streaming_tool_handles_errors- Tests error handling in streaming toolstest_handle_function_calls_async_with_streaming_separates_tools- Ensures streaming and regular tools are handled separatelytest_streaming_tool_with_tool_context- Verifies tool_context is correctly passedRegression Tests:
All existing related tests pass:
$ pytest tests/unittests/flows/llm_flows/test_functions_simple.py tests/unittests/flows/llm_flows/test_base_llm_flow.py -v tests/unittests/flows/llm_flows/test_functions_simple.py::test_simple_function PASSED [ 2%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_async_function PASSED [ 4%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_tool PASSED [ 7%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_update_state PASSED [ 9%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_id PASSED [ 11%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_no_function_response_in_last_event PASSED [ 14%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_empty_session_events PASSED [ 16%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_function_response_but_no_matching_call PASSED [ 19%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_function_response_with_matching_call PASSED [ 21%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_multiple_function_responses PASSED [ 23%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_not_modified PASSED [ 26%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_none_handling PASSED [ 28%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_copy_behavior PASSED [ 30%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_deep_copy_behavior PASSED [ 33%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_shallow_vs_deep_copy_demonstration PASSED [ 35%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_parallel_function_execution_timing PASSED [ 38%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_parallel_state_modifications_thread_safety PASSED [ 40%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_sync_function_blocks_async_functions PASSED [ 42%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_async_function_without_yield_blocks_others PASSED [ 45%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_merge_parallel_function_response_events_preserves_invocation_id PASSED [ 47%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_merge_parallel_function_response_events_single_event PASSED [ 50%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_merge_parallel_function_response_events_preserves_other_attributes PASSED [ 52%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_yielding_async_functions_run_concurrently PASSED [ 54%] tests/unittests/flows/llm_flows/test_functions_simple.py::test_mixed_function_types_execution_order PASSED [ 57%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_calls_toolset_process_llm_request PASSED [ 59%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_handles_mixed_tools_and_toolsets PASSED [ 61%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_with_google_search_only PASSED [ 64%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_with_google_search_workaround PASSED [ 66%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_calls_convert_tool_union_to_tools PASSED [ 69%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[no_search_no_grounding] PASSED [ 71%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[with_search_with_grounding] PASSED [ 73%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[no_search_with_grounding] PASSED [ 76%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[with_search_no_grounding] PASSED [ 78%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[no_search_no_grounding] PASSED [ 80%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[with_search_with_grounding] PASSED [ 83%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[no_search_with_grounding] PASSED [ 85%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[with_search_no_grounding] PASSED [ 88%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[no_search_no_grounding] PASSED [ 90%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[with_search_with_grounding] PASSED [ 92%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[no_search_with_grounding] PASSED [ 95%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[with_search_no_grounding] PASSED [ 97%] tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_caches_canonical_tools PASSED [100%] ================================ warnings summary ================================= tests/unittests/flows/llm_flows/test_functions_simple.py: 11 warnings /Users/usharout/projects/adk-python/src/google/adk/runners.py:1381: DeprecationWarning: deprecated save_input_blobs_as_artifacts=run_config.save_input_blobs_as_artifacts, -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html ========================= 42 passed, 11 warnings in 6.76s =========================Manual End-to-End (E2E) Tests:
Setup:
Navigate to the sample agent directory:
cd contributing/samplesLaunch ADK Web UI:
adk web .Select the
streaming_tools_non_live_agentfrom the agent listTest Cases:
Stock Price Monitoring:
Large Dataset Processing:
System Health Monitoring:
Verification:
function_responseEvents appearChecklist
Additional context
This PR addresses the use case described in #3837
This PR extends the streaming tools pattern (which works in live mode) to non-live mode.
Sample Agent
A sample agent demonstrating the feature is included:
contributing/samples/streaming_tools_non_live_agent/monitor_stock_price,process_large_dataset,monitor_system_health