Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ class SingleStepResult:
"""Items generated before the current step."""

new_step_items: list[RunItem]
"""Items generated during this current step."""
"""Items generated during this current step. May be filtered during handoffs to avoid
duplication in model input."""

next_step: NextStepHandoff | NextStepFinalOutput | NextStepRunAgain
"""The next step to take."""
Expand All @@ -255,11 +256,18 @@ class SingleStepResult:
tool_output_guardrail_results: list[ToolOutputGuardrailResult]
"""Tool output guardrail results from this step."""

session_step_items: list[RunItem] | None = None
"""Full unfiltered items for session history. When set, these are used instead of
new_step_items for session saving and generated_items property."""

@property
def generated_items(self) -> list[RunItem]:
"""Items generated during the agent run (i.e. everything generated after
`original_input`)."""
return self.pre_step_items + self.new_step_items
`original_input`). Uses session_step_items when available for full observability."""
items = (
self.session_step_items if self.session_step_items is not None else self.new_step_items
)
return self.pre_step_items + items


def get_model_tracing_impl(
Expand Down Expand Up @@ -1286,6 +1294,12 @@ async def execute_handoffs(
)
pre_step_items = list(filtered.pre_handoff_items)
new_step_items = list(filtered.new_items)
# For custom input filters, use input_items if available, otherwise new_items
if filtered.input_items is not None:
session_step_items = list(filtered.new_items)
new_step_items = list(filtered.input_items)
else:
session_step_items = None
elif should_nest_history and handoff_input_data is not None:
nested = nest_handoff_history(
handoff_input_data,
Expand All @@ -1297,7 +1311,16 @@ async def execute_handoffs(
else list(nested.input_history)
)
pre_step_items = list(nested.pre_handoff_items)
new_step_items = list(nested.new_items)
# Keep full new_items for session history.
session_step_items = list(nested.new_items)
# Use input_items (filtered) for model input if available.
if nested.input_items is not None:
new_step_items = list(nested.input_items)
else:
new_step_items = session_step_items
else:
# No filtering or nesting - session_step_items not needed
session_step_items = None

return SingleStepResult(
original_input=original_input,
Expand All @@ -1307,6 +1330,7 @@ async def execute_handoffs(
next_step=NextStepHandoff(new_agent),
tool_input_guardrail_results=[],
tool_output_guardrail_results=[],
session_step_items=session_step_items,
)

@classmethod
Expand Down
7 changes: 7 additions & 0 deletions src/agents/handoffs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class HandoffInputData:
later on, it is optional for backwards compatibility.
"""

input_items: tuple[RunItem, ...] | None = None
"""
Items to include in the next agent's input. When set, these items are used instead of
new_items for building the input to the next agent. This allows filtering duplicates
from agent input while preserving all items in new_items for session history.
"""

def clone(self, **kwargs: Any) -> HandoffInputData:
"""
Make a copy of the handoff input data, with the given arguments changed. For example, you
Expand Down
58 changes: 45 additions & 13 deletions src/agents/handoffs/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
_conversation_history_start = _DEFAULT_CONVERSATION_HISTORY_START
_conversation_history_end = _DEFAULT_CONVERSATION_HISTORY_END

# Item types that are summarized in the conversation history.
# They should not be forwarded verbatim to the next agent to avoid duplication.
_SUMMARY_ONLY_INPUT_TYPES = {
"function_call",
"function_call_output",
}


def set_conversation_history_wrappers(
*,
Expand Down Expand Up @@ -67,23 +74,34 @@ def nest_handoff_history(

normalized_history = _normalize_input_history(handoff_input_data.input_history)
flattened_history = _flatten_nested_history_messages(normalized_history)
pre_items_as_inputs = [
_run_item_to_plain_input(item) for item in handoff_input_data.pre_handoff_items
]
new_items_as_inputs = [_run_item_to_plain_input(item) for item in handoff_input_data.new_items]

# Convert items to plain inputs for the transcript summary.
pre_items_as_inputs: list[TResponseInputItem] = []
filtered_pre_items: list[RunItem] = []
for run_item in handoff_input_data.pre_handoff_items:
plain_input = _run_item_to_plain_input(run_item)
pre_items_as_inputs.append(plain_input)
if _should_forward_pre_item(plain_input):
filtered_pre_items.append(run_item)

new_items_as_inputs: list[TResponseInputItem] = []
filtered_input_items: list[RunItem] = []
for run_item in handoff_input_data.new_items:
plain_input = _run_item_to_plain_input(run_item)
new_items_as_inputs.append(plain_input)
if _should_forward_new_item(plain_input):
filtered_input_items.append(run_item)

transcript = flattened_history + pre_items_as_inputs + new_items_as_inputs

mapper = history_mapper or default_handoff_history_mapper
history_items = mapper(transcript)
filtered_pre_items = tuple(
item
for item in handoff_input_data.pre_handoff_items
if _get_run_item_role(item) != "assistant"
)

return handoff_input_data.clone(
input_history=tuple(deepcopy(item) for item in history_items),
pre_handoff_items=filtered_pre_items,
pre_handoff_items=tuple(filtered_pre_items),
# new_items stays unchanged for session history.
input_items=tuple(filtered_input_items),
)


Expand Down Expand Up @@ -231,6 +249,20 @@ def _split_role_and_name(role_text: str) -> tuple[str, str | None]:
return (role_text or "developer", None)


def _get_run_item_role(run_item: RunItem) -> str | None:
role_candidate = run_item.to_input_item().get("role")
return role_candidate if isinstance(role_candidate, str) else None
def _should_forward_pre_item(input_item: TResponseInputItem) -> bool:
"""Return False when the previous transcript item is represented in the summary."""
role_candidate = input_item.get("role")
if isinstance(role_candidate, str) and role_candidate == "assistant":
return False
type_candidate = input_item.get("type")
return not (isinstance(type_candidate, str) and type_candidate in _SUMMARY_ONLY_INPUT_TYPES)


def _should_forward_new_item(input_item: TResponseInputItem) -> bool:
"""Return False for tool or side-effect items that the summary already covers."""
# Items with a role should always be forwarded.
role_candidate = input_item.get("role")
if isinstance(role_candidate, str) and role_candidate:
return True
type_candidate = input_item.get("type")
return not (isinstance(type_candidate, str) and type_candidate in _SUMMARY_ONLY_INPUT_TYPES)
72 changes: 58 additions & 14 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ async def run(
):
current_turn = 0
original_input: str | list[TResponseInputItem] = _copy_str_or_list(prepared_input)
generated_items: list[RunItem] = []
generated_items: list[RunItem] = [] # For model input (may be filtered on handoffs)
session_items: list[RunItem] = [] # For observability (always unfiltered)
model_responses: list[ModelResponse] = []

context_wrapper: RunContextWrapper[TContext] = RunContextWrapper(
Expand Down Expand Up @@ -705,7 +706,15 @@ async def run(

model_responses.append(turn_result.model_response)
original_input = turn_result.original_input
generated_items = turn_result.generated_items
# For model input, use new_step_items (filtered on handoffs)
generated_items = turn_result.pre_step_items + turn_result.new_step_items
# Accumulate unfiltered items for observability
session_items_for_turn = (
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items
)
session_items.extend(session_items_for_turn)

if server_conversation_tracker is not None:
server_conversation_tracker.track_server_items(turn_result.model_response)
Expand All @@ -725,7 +734,7 @@ async def run(
)
result = RunResult(
input=original_input,
new_items=generated_items,
new_items=session_items, # Use unfiltered items for observability
raw_responses=model_responses,
final_output=turn_result.next_step.output,
_last_agent=current_agent,
Expand All @@ -740,7 +749,11 @@ async def run(
for guardrail_result in input_guardrail_results
):
await self._save_result_to_session(
session, [], turn_result.new_step_items
session,
[],
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
)

return result
Expand All @@ -752,7 +765,11 @@ async def run(
for guardrail_result in input_guardrail_results
):
await self._save_result_to_session(
session, [], turn_result.new_step_items
session,
[],
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
)
current_agent = cast(Agent[TContext], turn_result.next_step.new_agent)
current_span.finish(reset_current=True)
Expand All @@ -764,7 +781,11 @@ async def run(
for guardrail_result in input_guardrail_results
):
await self._save_result_to_session(
session, [], turn_result.new_step_items
session,
[],
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
)
else:
raise AgentsException(
Expand All @@ -780,7 +801,7 @@ async def run(
except AgentsException as exc:
exc.run_data = RunErrorDetails(
input=original_input,
new_items=generated_items,
new_items=session_items, # Use unfiltered items for observability
raw_responses=model_responses,
last_agent=current_agent,
context_wrapper=context_wrapper,
Expand Down Expand Up @@ -1218,7 +1239,13 @@ async def _start_streaming(
turn_result.model_response
]
streamed_result.input = turn_result.original_input
streamed_result.new_items = turn_result.generated_items
# Accumulate unfiltered items for observability
session_items_for_turn = (
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items
)
streamed_result.new_items.extend(session_items_for_turn)

if server_conversation_tracker is not None:
server_conversation_tracker.track_server_items(turn_result.model_response)
Expand All @@ -1234,7 +1261,11 @@ async def _start_streaming(
)
if should_skip_session_save is False:
await AgentRunner._save_result_to_session(
session, [], turn_result.new_step_items
session,
[],
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
)

current_agent = turn_result.next_step.new_agent
Expand Down Expand Up @@ -1280,7 +1311,11 @@ async def _start_streaming(
)
if should_skip_session_save is False:
await AgentRunner._save_result_to_session(
session, [], turn_result.new_step_items
session,
[],
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
)

streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
Expand All @@ -1293,7 +1328,11 @@ async def _start_streaming(
)
if should_skip_session_save is False:
await AgentRunner._save_result_to_session(
session, [], turn_result.new_step_items
session,
[],
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
)

# Check for soft cancel after turn completion
Expand Down Expand Up @@ -1745,10 +1784,15 @@ async def _get_single_step_result_from_streamed_response(
context_wrapper=context_wrapper,
run_config=run_config,
)
# Use session_step_items (unfiltered) if available for streaming observability,
# otherwise fall back to new_step_items.
streaming_items = (
single_step_result.session_step_items
if single_step_result.session_step_items is not None
else single_step_result.new_step_items
)
new_step_items = [
item
for item in single_step_result.new_step_items
if item not in new_items_processed_response
item for item in streaming_items if item not in new_items_processed_response
]
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)

Expand Down
3 changes: 2 additions & 1 deletion tests/test_agent_runner_streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ async def test_streaming_events():
"tool_call_output": 2,
"message": 2, # get_text_message("a_message") + get_final_output_message(...)
"handoff": 1, # get_handoff_tool_call(agent_1)
"handoff_output": 1, # handoff_output_item
# handoff_output is summarized in conversation history, not duplicated as raw item
"handoff_output": 0,
}

total_expected_item_count = sum(expected_item_type_map.values())
Expand Down
Loading