diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 81fe1f3b73..d1ff567f81 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -23,6 +23,7 @@ ) from ..exceptions import AgentExecutionException +from ._checkpoint import CheckpointStorage from ._events import ( AgentRunUpdateEvent, RequestInfoEvent, @@ -117,6 +118,8 @@ async def run( messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, *, thread: AgentThread | None = None, + checkpoint_id: str | None = None, + checkpoint_storage: CheckpointStorage | None = None, **kwargs: Any, ) -> AgentRunResponse: """Get a response from the workflow agent (non-streaming). @@ -124,10 +127,16 @@ async def run( This method collects all streaming updates and merges them into a single response. Args: - messages: The message(s) to send to the workflow. + messages: The message(s) to send to the workflow. Required for new runs, + should be None when resuming from checkpoint. Keyword Args: thread: The conversation thread. If None, a new thread will be created. + checkpoint_id: ID of checkpoint to restore from. If provided, the workflow + resumes from this checkpoint instead of starting fresh. + checkpoint_storage: Runtime checkpoint storage. When provided with checkpoint_id, + used to load and restore the checkpoint. When provided without checkpoint_id, + enables checkpointing for this run. **kwargs: Additional keyword arguments. Returns: @@ -139,7 +148,9 @@ async def run( thread = thread or self.get_new_thread() response_id = str(uuid.uuid4()) - async for update in self._run_stream_impl(input_messages, response_id): + async for update in self._run_stream_impl( + input_messages, response_id, thread, checkpoint_id, checkpoint_storage + ): response_updates.append(update) # Convert updates to final response. @@ -155,15 +166,23 @@ async def run_stream( messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, *, thread: AgentThread | None = None, + checkpoint_id: str | None = None, + checkpoint_storage: CheckpointStorage | None = None, **kwargs: Any, ) -> AsyncIterable[AgentRunResponseUpdate]: """Stream response updates from the workflow agent. Args: - messages: The message(s) to send to the workflow. + messages: The message(s) to send to the workflow. Required for new runs, + should be None when resuming from checkpoint. Keyword Args: thread: The conversation thread. If None, a new thread will be created. + checkpoint_id: ID of checkpoint to restore from. If provided, the workflow + resumes from this checkpoint instead of starting fresh. + checkpoint_storage: Runtime checkpoint storage. When provided with checkpoint_id, + used to load and restore the checkpoint. When provided without checkpoint_id, + enables checkpointing for this run. **kwargs: Additional keyword arguments. Yields: @@ -174,7 +193,9 @@ async def run_stream( response_updates: list[AgentRunResponseUpdate] = [] response_id = str(uuid.uuid4()) - async for update in self._run_stream_impl(input_messages, response_id): + async for update in self._run_stream_impl( + input_messages, response_id, thread, checkpoint_id, checkpoint_storage + ): response_updates.append(update) yield update @@ -188,12 +209,18 @@ async def _run_stream_impl( self, input_messages: list[ChatMessage], response_id: str, + thread: AgentThread, + checkpoint_id: str | None = None, + checkpoint_storage: CheckpointStorage | None = None, ) -> AsyncIterable[AgentRunResponseUpdate]: """Internal implementation of streaming execution. Args: input_messages: Normalized input messages to process. response_id: The unique response ID for this workflow execution. + thread: The conversation thread containing message history. + checkpoint_id: ID of checkpoint to restore from. + checkpoint_storage: Runtime checkpoint storage. Yields: AgentRunResponseUpdate objects representing the workflow execution progress. @@ -217,10 +244,27 @@ async def _run_stream_impl( # and we will let the workflow to handle this -- the agent does not # have an opinion on this. event_stream = self.workflow.send_responses_streaming(function_responses) + elif checkpoint_id is not None: + # Resume from checkpoint - don't prepend thread history since workflow state + # is being restored from the checkpoint + event_stream = self.workflow.run_stream( + message=None, + checkpoint_id=checkpoint_id, + checkpoint_storage=checkpoint_storage, + ) else: # Execute workflow with streaming (initial run or no function responses) - # Pass the new input messages directly to the workflow - event_stream = self.workflow.run_stream(input_messages) + # Build the complete conversation by prepending thread history to input messages + conversation_messages: list[ChatMessage] = [] + if thread.message_store: + history = await thread.message_store.list_messages() + if history: + conversation_messages.extend(history) + conversation_messages.extend(input_messages) + event_stream = self.workflow.run_stream( + message=conversation_messages, + checkpoint_storage=checkpoint_storage, + ) # Process events from the stream async for event in event_stream: diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index 009ead6edd..c005a0f9f9 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -9,7 +9,9 @@ AgentRunResponse, AgentRunResponseUpdate, AgentRunUpdateEvent, + AgentThread, ChatMessage, + ChatMessageStore, Executor, FunctionApprovalRequestContent, FunctionApprovalResponseContent, @@ -75,6 +77,31 @@ async def handle_request_response( await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=update)) +class ConversationHistoryCapturingExecutor(Executor): + """Executor that captures the received conversation history for verification.""" + + def __init__(self, id: str): + super().__init__(id=id) + self.received_messages: list[ChatMessage] = [] + + @handler + async def handle_message(self, messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: + # Capture all received messages + self.received_messages = list(messages) + + # Count messages by role for the response + message_count = len(messages) + response_text = f"Received {message_count} messages" + + response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)]) + + streaming_update = AgentRunResponseUpdate( + contents=[TextContent(text=response_text)], role=Role.ASSISTANT, message_id=str(uuid.uuid4()) + ) + await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=streaming_update)) + await ctx.send_message([response_message]) + + class TestWorkflowAgent: """Test cases for WorkflowAgent end-to-end functionality.""" @@ -257,6 +284,105 @@ async def handle_bool(self, message: bool, context: WorkflowContext[Any]) -> Non with pytest.raises(ValueError, match="Workflow's start executor cannot handle list\\[ChatMessage\\]"): workflow.as_agent() + async def test_thread_conversation_history_included_in_workflow_run(self) -> None: + """Test that conversation history from thread is included when running WorkflowAgent. + + This verifies that when a thread with existing messages is provided to agent.run(), + the workflow receives the complete conversation history (thread history + new messages). + """ + # Create an executor that captures all received messages + capturing_executor = ConversationHistoryCapturingExecutor(id="capturing") + workflow = WorkflowBuilder().set_start_executor(capturing_executor).build() + agent = WorkflowAgent(workflow=workflow, name="Thread History Test Agent") + + # Create a thread with existing conversation history + history_messages = [ + ChatMessage(role=Role.USER, text="Previous user message"), + ChatMessage(role=Role.ASSISTANT, text="Previous assistant response"), + ] + message_store = ChatMessageStore(messages=history_messages) + thread = AgentThread(message_store=message_store) + + # Run the agent with the thread and a new message + new_message = "New user question" + await agent.run(new_message, thread=thread) + + # Verify the executor received both history AND new message + assert len(capturing_executor.received_messages) == 3 + + # Verify the order: history first, then new message + assert capturing_executor.received_messages[0].text == "Previous user message" + assert capturing_executor.received_messages[1].text == "Previous assistant response" + assert capturing_executor.received_messages[2].text == "New user question" + + async def test_thread_conversation_history_included_in_workflow_stream(self) -> None: + """Test that conversation history from thread is included when streaming WorkflowAgent. + + This verifies that run_stream also includes thread history. + """ + # Create an executor that captures all received messages + capturing_executor = ConversationHistoryCapturingExecutor(id="capturing_stream") + workflow = WorkflowBuilder().set_start_executor(capturing_executor).build() + agent = WorkflowAgent(workflow=workflow, name="Thread Stream Test Agent") + + # Create a thread with existing conversation history + history_messages = [ + ChatMessage(role=Role.SYSTEM, text="You are a helpful assistant"), + ChatMessage(role=Role.USER, text="Hello"), + ChatMessage(role=Role.ASSISTANT, text="Hi there!"), + ] + message_store = ChatMessageStore(messages=history_messages) + thread = AgentThread(message_store=message_store) + + # Stream from the agent with the thread and a new message + async for _ in agent.run_stream("How are you?", thread=thread): + pass + + # Verify the executor received all messages (3 from history + 1 new) + assert len(capturing_executor.received_messages) == 4 + + # Verify the order + assert capturing_executor.received_messages[0].text == "You are a helpful assistant" + assert capturing_executor.received_messages[1].text == "Hello" + assert capturing_executor.received_messages[2].text == "Hi there!" + assert capturing_executor.received_messages[3].text == "How are you?" + + async def test_empty_thread_works_correctly(self) -> None: + """Test that an empty thread (no message store) works correctly.""" + capturing_executor = ConversationHistoryCapturingExecutor(id="empty_thread_test") + workflow = WorkflowBuilder().set_start_executor(capturing_executor).build() + agent = WorkflowAgent(workflow=workflow, name="Empty Thread Test Agent") + + # Create an empty thread + thread = AgentThread() + + # Run with the empty thread + await agent.run("Just a new message", thread=thread) + + # Should only receive the new message + assert len(capturing_executor.received_messages) == 1 + assert capturing_executor.received_messages[0].text == "Just a new message" + + async def test_checkpoint_storage_passed_to_workflow(self) -> None: + """Test that checkpoint_storage parameter is passed through to the workflow.""" + from agent_framework import InMemoryCheckpointStorage + + capturing_executor = ConversationHistoryCapturingExecutor(id="checkpoint_test") + workflow = WorkflowBuilder().set_start_executor(capturing_executor).build() + agent = WorkflowAgent(workflow=workflow, name="Checkpoint Test Agent") + + # Create checkpoint storage + checkpoint_storage = InMemoryCheckpointStorage() + + # Run with checkpoint storage enabled + async for _ in agent.run_stream("Test message", checkpoint_storage=checkpoint_storage): + pass + + # Drain workflow events to get checkpoint + # The workflow should have created checkpoints + checkpoints = await checkpoint_storage.list_checkpoints(workflow.id) + assert len(checkpoints) > 0, "Checkpoints should have been created when checkpoint_storage is provided" + class TestWorkflowAgentMergeUpdates: """Test cases specifically for the WorkflowAgent.merge_updates static method.""" diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 7c5ad5687f..0cfcd85cd2 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -44,6 +44,7 @@ Once comfortable with these, explore the rest of the samples below. | Magentic Workflow as Agent | [agents/magentic_workflow_as_agent.py](./agents/magentic_workflow_as_agent.py) | Configure Magentic orchestration with callbacks, then expose the workflow as an agent | | Workflow as Agent (Reflection Pattern) | [agents/workflow_as_agent_reflection_pattern.py](./agents/workflow_as_agent_reflection_pattern.py) | Wrap a workflow so it can behave like an agent (reflection pattern) | | Workflow as Agent + HITL | [agents/workflow_as_agent_human_in_the_loop.py](./agents/workflow_as_agent_human_in_the_loop.py) | Extend workflow-as-agent with human-in-the-loop capability | +| Workflow as Agent with Thread | [agents/workflow_as_agent_with_thread.py](./agents/workflow_as_agent_with_thread.py) | Use AgentThread to maintain conversation history across workflow-as-agent invocations | | Handoff Workflow as Agent | [agents/handoff_workflow_as_agent.py](./agents/handoff_workflow_as_agent.py) | Use a HandoffBuilder workflow as an agent with HITL via FunctionCallContent/FunctionResultContent | ### checkpoint @@ -54,6 +55,7 @@ Once comfortable with these, explore the rest of the samples below. | Checkpoint & HITL Resume | [checkpoint/checkpoint_with_human_in_the_loop.py](./checkpoint/checkpoint_with_human_in_the_loop.py) | Combine checkpointing with human approvals and resume pending HITL requests | | Checkpointed Sub-Workflow | [checkpoint/sub_workflow_checkpoint.py](./checkpoint/sub_workflow_checkpoint.py) | Save and resume a sub-workflow that pauses for human approval | | Handoff + Tool Approval Resume | [checkpoint/handoff_with_tool_approval_checkpoint_resume.py](./checkpoint/handoff_with_tool_approval_checkpoint_resume.py) | Handoff workflow that captures tool-call approvals in checkpoints and resumes with human decisions | +| Workflow as Agent Checkpoint | [checkpoint/workflow_as_agent_checkpoint.py](./checkpoint/workflow_as_agent_checkpoint.py) | Enable checkpointing when using workflow.as_agent() with checkpoint_storage parameter | ### composition diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_with_thread.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_with_thread.py new file mode 100644 index 0000000000..0d8e066f91 --- /dev/null +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_with_thread.py @@ -0,0 +1,167 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import AgentThread, ChatAgent, ChatMessageStore, SequentialBuilder +from agent_framework.openai import OpenAIChatClient + +""" +Sample: Workflow as Agent with Thread Conversation History and Checkpointing + +This sample demonstrates how to use AgentThread with a workflow wrapped as an agent +to maintain conversation history across multiple invocations. When using as_agent(), +the thread's message store history is included in each workflow run, enabling +the workflow participants to reference prior conversation context. + +It also demonstrates how to enable checkpointing for workflow execution state +persistence, allowing workflows to be paused and resumed. + +Key concepts: +- Workflows can be wrapped as agents using workflow.as_agent() +- AgentThread with ChatMessageStore preserves conversation history +- Each call to agent.run() includes thread history + new message +- Participants in the workflow see the full conversation context +- checkpoint_storage parameter enables workflow state persistence + +Use cases: +- Multi-turn conversations with workflow-based orchestrations +- Stateful workflows that need context from previous interactions +- Building conversational agents that leverage workflow patterns +- Long-running workflows that need pause/resume capability + +Prerequisites: +- OpenAI environment variables configured for OpenAIChatClient +""" + + +async def main() -> None: + # Create a chat client + chat_client = OpenAIChatClient() + + # Define factory functions for workflow participants + def create_assistant() -> ChatAgent: + return chat_client.create_agent( + name="assistant", + instructions=( + "You are a helpful assistant. Answer questions based on the conversation " + "history. If the user asks about something mentioned earlier, reference it." + ), + ) + + def create_summarizer() -> ChatAgent: + return chat_client.create_agent( + name="summarizer", + instructions=( + "You are a summarizer. After the assistant responds, provide a brief " + "one-sentence summary of the key point from the conversation so far." + ), + ) + + # Build a sequential workflow: assistant -> summarizer + workflow = SequentialBuilder().register_participants([create_assistant, create_summarizer]).build() + + # Wrap the workflow as an agent + agent = workflow.as_agent(name="ConversationalWorkflowAgent") + + # Create a thread with a ChatMessageStore to maintain history + message_store = ChatMessageStore() + thread = AgentThread(message_store=message_store) + + print("=" * 60) + print("Workflow as Agent with Thread - Multi-turn Conversation") + print("=" * 60) + + # First turn: Introduce a topic + query1 = "My name is Alex and I'm learning about machine learning." + print(f"\n[Turn 1] User: {query1}") + + response1 = await agent.run(query1, thread=thread) + if response1.messages: + for msg in response1.messages: + speaker = msg.author_name or msg.role.value + print(f"[{speaker}]: {msg.text}") + + # Second turn: Reference the previous topic + query2 = "What was my name again, and what am I learning about?" + print(f"\n[Turn 2] User: {query2}") + + response2 = await agent.run(query2, thread=thread) + if response2.messages: + for msg in response2.messages: + speaker = msg.author_name or msg.role.value + print(f"[{speaker}]: {msg.text}") + + # Third turn: Ask a follow-up question + query3 = "Can you suggest a good first project for me to try?" + print(f"\n[Turn 3] User: {query3}") + + response3 = await agent.run(query3, thread=thread) + if response3.messages: + for msg in response3.messages: + speaker = msg.author_name or msg.role.value + print(f"[{speaker}]: {msg.text}") + + # Show the accumulated conversation history + print("\n" + "=" * 60) + print("Full Thread History") + print("=" * 60) + if thread.message_store: + history = await thread.message_store.list_messages() + for i, msg in enumerate(history, start=1): + role = msg.role.value if hasattr(msg.role, "value") else str(msg.role) + speaker = msg.author_name or role + text_preview = msg.text[:80] + "..." if len(msg.text) > 80 else msg.text + print(f"{i:02d}. [{speaker}]: {text_preview}") + + +async def demonstrate_thread_serialization() -> None: + """ + Demonstrates serializing and resuming a thread with a workflow agent. + + This shows how conversation history can be persisted and restored, + enabling long-running conversational workflows. + """ + chat_client = OpenAIChatClient() + + def create_assistant() -> ChatAgent: + return chat_client.create_agent( + name="memory_assistant", + instructions="You are a helpful assistant with good memory. Remember details from our conversation.", + ) + + workflow = SequentialBuilder().register_participants([create_assistant]).build() + agent = workflow.as_agent(name="MemoryWorkflowAgent") + + # Create initial thread and have a conversation + thread = AgentThread(message_store=ChatMessageStore()) + + print("\n" + "=" * 60) + print("Thread Serialization Demo") + print("=" * 60) + + # First interaction + query = "Remember this: the secret code is ALPHA-7." + print(f"\n[Session 1] User: {query}") + response = await agent.run(query, thread=thread) + if response.messages: + print(f"[assistant]: {response.messages[0].text}") + + # Serialize thread state (could be saved to database/file) + serialized_state = await thread.serialize() + print("\n[Serialized thread state for persistence]") + + # Simulate a new session by creating a new thread from serialized state + restored_thread = AgentThread(message_store=ChatMessageStore()) + await restored_thread.update_from_thread_state(serialized_state) + + # Continue conversation with restored thread + query = "What was the secret code I told you?" + print(f"\n[Session 2 - Restored] User: {query}") + response = await agent.run(query, thread=restored_thread) + if response.messages: + print(f"[assistant]: {response.messages[0].text}") + + +if __name__ == "__main__": + asyncio.run(main()) + asyncio.run(demonstrate_thread_serialization()) diff --git a/python/samples/getting_started/workflows/checkpoint/workflow_as_agent_checkpoint.py b/python/samples/getting_started/workflows/checkpoint/workflow_as_agent_checkpoint.py new file mode 100644 index 0000000000..1c1488ef10 --- /dev/null +++ b/python/samples/getting_started/workflows/checkpoint/workflow_as_agent_checkpoint.py @@ -0,0 +1,163 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Workflow as Agent with Checkpointing + +Purpose: +This sample demonstrates how to use checkpointing with a workflow wrapped as an agent. +It shows how to enable checkpoint storage when calling agent.run() or agent.run_stream(), +allowing workflow execution state to be persisted and potentially resumed. + +What you learn: +- How to pass checkpoint_storage to WorkflowAgent.run() and run_stream() +- How checkpoints are created during workflow-as-agent execution +- How to combine thread conversation history with workflow checkpointing +- How to resume a workflow-as-agent from a checkpoint + +Key concepts: +- Thread (AgentThread): Maintains conversation history across agent invocations +- Checkpoint: Persists workflow execution state for pause/resume capability +- These are complementary: threads track conversation, checkpoints track workflow state + +Prerequisites: +- OpenAI environment variables configured for OpenAIChatClient +""" + +import asyncio + +from agent_framework import ( + AgentThread, + ChatAgent, + ChatMessageStore, + InMemoryCheckpointStorage, + SequentialBuilder, +) +from agent_framework.openai import OpenAIChatClient + + +async def basic_checkpointing() -> None: + """Demonstrate basic checkpoint storage with workflow-as-agent.""" + print("=" * 60) + print("Basic Checkpointing with Workflow as Agent") + print("=" * 60) + + chat_client = OpenAIChatClient() + + def create_assistant() -> ChatAgent: + return chat_client.create_agent( + name="assistant", + instructions="You are a helpful assistant. Keep responses brief.", + ) + + def create_reviewer() -> ChatAgent: + return chat_client.create_agent( + name="reviewer", + instructions="You are a reviewer. Provide a one-sentence summary of the assistant's response.", + ) + + # Build sequential workflow with participant factories + workflow = SequentialBuilder().register_participants([create_assistant, create_reviewer]).build() + agent = workflow.as_agent(name="CheckpointedAgent") + + # Create checkpoint storage + checkpoint_storage = InMemoryCheckpointStorage() + + # Run with checkpointing enabled + query = "What are the benefits of renewable energy?" + print(f"\nUser: {query}") + + response = await agent.run(query, checkpoint_storage=checkpoint_storage) + + for msg in response.messages: + speaker = msg.author_name or msg.role.value + print(f"[{speaker}]: {msg.text}") + + # Show checkpoints that were created + checkpoints = await checkpoint_storage.list_checkpoints(workflow.id) + print(f"\nCheckpoints created: {len(checkpoints)}") + for i, cp in enumerate(checkpoints[:5], 1): + print(f" {i}. {cp.checkpoint_id}") + + +async def checkpointing_with_thread() -> None: + """Demonstrate combining thread history with checkpointing.""" + print("\n" + "=" * 60) + print("Checkpointing with Thread Conversation History") + print("=" * 60) + + chat_client = OpenAIChatClient() + + def create_assistant() -> ChatAgent: + return chat_client.create_agent( + name="memory_assistant", + instructions="You are a helpful assistant with good memory. Reference previous conversation when relevant.", + ) + + workflow = SequentialBuilder().register_participants([create_assistant]).build() + agent = workflow.as_agent(name="MemoryAgent") + + # Create both thread (for conversation) and checkpoint storage (for workflow state) + thread = AgentThread(message_store=ChatMessageStore()) + checkpoint_storage = InMemoryCheckpointStorage() + + # First turn + query1 = "My favorite color is blue. Remember that." + print(f"\n[Turn 1] User: {query1}") + response1 = await agent.run(query1, thread=thread, checkpoint_storage=checkpoint_storage) + if response1.messages: + print(f"[assistant]: {response1.messages[0].text}") + + # Second turn - agent should remember from thread history + query2 = "What's my favorite color?" + print(f"\n[Turn 2] User: {query2}") + response2 = await agent.run(query2, thread=thread, checkpoint_storage=checkpoint_storage) + if response2.messages: + print(f"[assistant]: {response2.messages[0].text}") + + # Show accumulated state + checkpoints = await checkpoint_storage.list_checkpoints(workflow.id) + print(f"\nTotal checkpoints across both turns: {len(checkpoints)}") + + if thread.message_store: + history = await thread.message_store.list_messages() + print(f"Messages in thread history: {len(history)}") + + +async def streaming_with_checkpoints() -> None: + """Demonstrate streaming with checkpoint storage.""" + print("\n" + "=" * 60) + print("Streaming with Checkpointing") + print("=" * 60) + + chat_client = OpenAIChatClient() + + def create_assistant() -> ChatAgent: + return chat_client.create_agent( + name="streaming_assistant", + instructions="You are a helpful assistant.", + ) + + workflow = SequentialBuilder().register_participants([create_assistant]).build() + agent = workflow.as_agent(name="StreamingCheckpointAgent") + + checkpoint_storage = InMemoryCheckpointStorage() + + query = "List three interesting facts about the ocean." + print(f"\nUser: {query}") + print("[assistant]: ", end="", flush=True) + + # Stream with checkpointing + async for update in agent.run_stream(query, checkpoint_storage=checkpoint_storage): + if update.text: + print(update.text, end="", flush=True) + + print() # Newline after streaming + + checkpoints = await checkpoint_storage.list_checkpoints(workflow.id) + print(f"\nCheckpoints created during stream: {len(checkpoints)}") + + +if __name__ == "__main__": + asyncio.run(basic_checkpointing()) + asyncio.run(checkpointing_with_thread()) + asyncio.run(streaming_with_checkpoints())