Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

from ..exceptions import AgentExecutionException
from ._checkpoint import CheckpointStorage
from ._events import (
AgentRunUpdateEvent,
RequestInfoEvent,
Expand Down Expand Up @@ -117,17 +118,25 @@ async def run(
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
checkpoint_id: str | None = None,
checkpoint_storage: CheckpointStorage | None = None,
**kwargs: Any,
) -> AgentRunResponse:
"""Get a response from the workflow agent (non-streaming).

This method collects all streaming updates and merges them into a single response.

Args:
messages: The message(s) to send to the workflow.
messages: The message(s) to send to the workflow. Required for new runs,
should be None when resuming from checkpoint.

Keyword Args:
thread: The conversation thread. If None, a new thread will be created.
checkpoint_id: ID of checkpoint to restore from. If provided, the workflow
resumes from this checkpoint instead of starting fresh.
checkpoint_storage: Runtime checkpoint storage. When provided with checkpoint_id,
used to load and restore the checkpoint. When provided without checkpoint_id,
enables checkpointing for this run.
**kwargs: Additional keyword arguments.

Returns:
Expand All @@ -139,7 +148,9 @@ async def run(
thread = thread or self.get_new_thread()
response_id = str(uuid.uuid4())

async for update in self._run_stream_impl(input_messages, response_id):
async for update in self._run_stream_impl(
input_messages, response_id, thread, checkpoint_id, checkpoint_storage
):
response_updates.append(update)

# Convert updates to final response.
Expand All @@ -155,15 +166,23 @@ async def run_stream(
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
checkpoint_id: str | None = None,
checkpoint_storage: CheckpointStorage | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
"""Stream response updates from the workflow agent.

Args:
messages: The message(s) to send to the workflow.
messages: The message(s) to send to the workflow. Required for new runs,
should be None when resuming from checkpoint.

Keyword Args:
thread: The conversation thread. If None, a new thread will be created.
checkpoint_id: ID of checkpoint to restore from. If provided, the workflow
resumes from this checkpoint instead of starting fresh.
checkpoint_storage: Runtime checkpoint storage. When provided with checkpoint_id,
used to load and restore the checkpoint. When provided without checkpoint_id,
enables checkpointing for this run.
**kwargs: Additional keyword arguments.

Yields:
Expand All @@ -174,7 +193,9 @@ async def run_stream(
response_updates: list[AgentRunResponseUpdate] = []
response_id = str(uuid.uuid4())

async for update in self._run_stream_impl(input_messages, response_id):
async for update in self._run_stream_impl(
input_messages, response_id, thread, checkpoint_id, checkpoint_storage
):
response_updates.append(update)
yield update

Expand All @@ -188,12 +209,18 @@ async def _run_stream_impl(
self,
input_messages: list[ChatMessage],
response_id: str,
thread: AgentThread,
checkpoint_id: str | None = None,
checkpoint_storage: CheckpointStorage | None = None,
) -> AsyncIterable[AgentRunResponseUpdate]:
"""Internal implementation of streaming execution.

Args:
input_messages: Normalized input messages to process.
response_id: The unique response ID for this workflow execution.
thread: The conversation thread containing message history.
checkpoint_id: ID of checkpoint to restore from.
checkpoint_storage: Runtime checkpoint storage.

Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
Expand All @@ -217,10 +244,27 @@ async def _run_stream_impl(
# and we will let the workflow to handle this -- the agent does not
# have an opinion on this.
event_stream = self.workflow.send_responses_streaming(function_responses)
elif checkpoint_id is not None:
# Resume from checkpoint - don't prepend thread history since workflow state
# is being restored from the checkpoint
event_stream = self.workflow.run_stream(
message=None,
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
)
else:
# Execute workflow with streaming (initial run or no function responses)
# Pass the new input messages directly to the workflow
event_stream = self.workflow.run_stream(input_messages)
# Build the complete conversation by prepending thread history to input messages
conversation_messages: list[ChatMessage] = []
if thread.message_store:
history = await thread.message_store.list_messages()
if history:
conversation_messages.extend(history)
conversation_messages.extend(input_messages)
event_stream = self.workflow.run_stream(
message=conversation_messages,
checkpoint_storage=checkpoint_storage,
)

# Process events from the stream
async for event in event_stream:
Expand Down
126 changes: 126 additions & 0 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
AgentRunResponse,
AgentRunResponseUpdate,
AgentRunUpdateEvent,
AgentThread,
ChatMessage,
ChatMessageStore,
Executor,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
Expand Down Expand Up @@ -75,6 +77,31 @@ async def handle_request_response(
await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=update))


class ConversationHistoryCapturingExecutor(Executor):
"""Executor that captures the received conversation history for verification."""

def __init__(self, id: str):
super().__init__(id=id)
self.received_messages: list[ChatMessage] = []

@handler
async def handle_message(self, messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
# Capture all received messages
self.received_messages = list(messages)

# Count messages by role for the response
message_count = len(messages)
response_text = f"Received {message_count} messages"

response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)])

streaming_update = AgentRunResponseUpdate(
contents=[TextContent(text=response_text)], role=Role.ASSISTANT, message_id=str(uuid.uuid4())
)
await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=streaming_update))
await ctx.send_message([response_message])


class TestWorkflowAgent:
"""Test cases for WorkflowAgent end-to-end functionality."""

Expand Down Expand Up @@ -257,6 +284,105 @@ async def handle_bool(self, message: bool, context: WorkflowContext[Any]) -> Non
with pytest.raises(ValueError, match="Workflow's start executor cannot handle list\\[ChatMessage\\]"):
workflow.as_agent()

async def test_thread_conversation_history_included_in_workflow_run(self) -> None:
"""Test that conversation history from thread is included when running WorkflowAgent.

This verifies that when a thread with existing messages is provided to agent.run(),
the workflow receives the complete conversation history (thread history + new messages).
"""
# Create an executor that captures all received messages
capturing_executor = ConversationHistoryCapturingExecutor(id="capturing")
workflow = WorkflowBuilder().set_start_executor(capturing_executor).build()
agent = WorkflowAgent(workflow=workflow, name="Thread History Test Agent")

# Create a thread with existing conversation history
history_messages = [
ChatMessage(role=Role.USER, text="Previous user message"),
ChatMessage(role=Role.ASSISTANT, text="Previous assistant response"),
]
message_store = ChatMessageStore(messages=history_messages)
thread = AgentThread(message_store=message_store)

# Run the agent with the thread and a new message
new_message = "New user question"
await agent.run(new_message, thread=thread)

# Verify the executor received both history AND new message
assert len(capturing_executor.received_messages) == 3

# Verify the order: history first, then new message
assert capturing_executor.received_messages[0].text == "Previous user message"
assert capturing_executor.received_messages[1].text == "Previous assistant response"
assert capturing_executor.received_messages[2].text == "New user question"

async def test_thread_conversation_history_included_in_workflow_stream(self) -> None:
"""Test that conversation history from thread is included when streaming WorkflowAgent.

This verifies that run_stream also includes thread history.
"""
# Create an executor that captures all received messages
capturing_executor = ConversationHistoryCapturingExecutor(id="capturing_stream")
workflow = WorkflowBuilder().set_start_executor(capturing_executor).build()
agent = WorkflowAgent(workflow=workflow, name="Thread Stream Test Agent")

# Create a thread with existing conversation history
history_messages = [
ChatMessage(role=Role.SYSTEM, text="You are a helpful assistant"),
ChatMessage(role=Role.USER, text="Hello"),
ChatMessage(role=Role.ASSISTANT, text="Hi there!"),
]
message_store = ChatMessageStore(messages=history_messages)
thread = AgentThread(message_store=message_store)

# Stream from the agent with the thread and a new message
async for _ in agent.run_stream("How are you?", thread=thread):
pass

# Verify the executor received all messages (3 from history + 1 new)
assert len(capturing_executor.received_messages) == 4

# Verify the order
assert capturing_executor.received_messages[0].text == "You are a helpful assistant"
assert capturing_executor.received_messages[1].text == "Hello"
assert capturing_executor.received_messages[2].text == "Hi there!"
assert capturing_executor.received_messages[3].text == "How are you?"

async def test_empty_thread_works_correctly(self) -> None:
"""Test that an empty thread (no message store) works correctly."""
capturing_executor = ConversationHistoryCapturingExecutor(id="empty_thread_test")
workflow = WorkflowBuilder().set_start_executor(capturing_executor).build()
agent = WorkflowAgent(workflow=workflow, name="Empty Thread Test Agent")

# Create an empty thread
thread = AgentThread()

# Run with the empty thread
await agent.run("Just a new message", thread=thread)

# Should only receive the new message
assert len(capturing_executor.received_messages) == 1
assert capturing_executor.received_messages[0].text == "Just a new message"

async def test_checkpoint_storage_passed_to_workflow(self) -> None:
"""Test that checkpoint_storage parameter is passed through to the workflow."""
from agent_framework import InMemoryCheckpointStorage

capturing_executor = ConversationHistoryCapturingExecutor(id="checkpoint_test")
workflow = WorkflowBuilder().set_start_executor(capturing_executor).build()
agent = WorkflowAgent(workflow=workflow, name="Checkpoint Test Agent")

# Create checkpoint storage
checkpoint_storage = InMemoryCheckpointStorage()

# Run with checkpoint storage enabled
async for _ in agent.run_stream("Test message", checkpoint_storage=checkpoint_storage):
pass

# Drain workflow events to get checkpoint
# The workflow should have created checkpoints
checkpoints = await checkpoint_storage.list_checkpoints(workflow.id)
assert len(checkpoints) > 0, "Checkpoints should have been created when checkpoint_storage is provided"


class TestWorkflowAgentMergeUpdates:
"""Test cases specifically for the WorkflowAgent.merge_updates static method."""
Expand Down
2 changes: 2 additions & 0 deletions python/samples/getting_started/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Once comfortable with these, explore the rest of the samples below.
| Magentic Workflow as Agent | [agents/magentic_workflow_as_agent.py](./agents/magentic_workflow_as_agent.py) | Configure Magentic orchestration with callbacks, then expose the workflow as an agent |
| Workflow as Agent (Reflection Pattern) | [agents/workflow_as_agent_reflection_pattern.py](./agents/workflow_as_agent_reflection_pattern.py) | Wrap a workflow so it can behave like an agent (reflection pattern) |
| Workflow as Agent + HITL | [agents/workflow_as_agent_human_in_the_loop.py](./agents/workflow_as_agent_human_in_the_loop.py) | Extend workflow-as-agent with human-in-the-loop capability |
| Workflow as Agent with Thread | [agents/workflow_as_agent_with_thread.py](./agents/workflow_as_agent_with_thread.py) | Use AgentThread to maintain conversation history across workflow-as-agent invocations |
| Handoff Workflow as Agent | [agents/handoff_workflow_as_agent.py](./agents/handoff_workflow_as_agent.py) | Use a HandoffBuilder workflow as an agent with HITL via FunctionCallContent/FunctionResultContent |

### checkpoint
Expand All @@ -54,6 +55,7 @@ Once comfortable with these, explore the rest of the samples below.
| Checkpoint & HITL Resume | [checkpoint/checkpoint_with_human_in_the_loop.py](./checkpoint/checkpoint_with_human_in_the_loop.py) | Combine checkpointing with human approvals and resume pending HITL requests |
| Checkpointed Sub-Workflow | [checkpoint/sub_workflow_checkpoint.py](./checkpoint/sub_workflow_checkpoint.py) | Save and resume a sub-workflow that pauses for human approval |
| Handoff + Tool Approval Resume | [checkpoint/handoff_with_tool_approval_checkpoint_resume.py](./checkpoint/handoff_with_tool_approval_checkpoint_resume.py) | Handoff workflow that captures tool-call approvals in checkpoints and resumes with human decisions |
| Workflow as Agent Checkpoint | [checkpoint/workflow_as_agent_checkpoint.py](./checkpoint/workflow_as_agent_checkpoint.py) | Enable checkpointing when using workflow.as_agent() with checkpoint_storage parameter |

### composition

Expand Down
Loading
Loading