diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index c29e178843..2199835c15 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -185,7 +185,22 @@ The event stream handler function will receive the agent [run context][pydantic_ As the streaming model request activity, workflow, and workflow execution call all take place in separate processes, passing data between them requires some care: - To get data from the workflow call site or workflow to the event stream handler, you can use a [dependencies object](#agent-run-context-and-dependencies). -- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from, like a message queue. You can use the dependency object to make sure the same connection string or other unique ID is available in all the places that need it. +- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from. Alternatively, you can use Temporal's built-in signals and queries to pass events from activities to the workflow and from the workflow to the caller. + +#### Example + +For a complete working example of streaming with Temporal using signals and queries, see the [temporal_streaming example](https://github.com/pydantic/pydantic-ai/tree/main/examples/pydantic_ai_examples/temporal_streaming). This example demonstrates: + +- How to use an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] to capture agent events in activities +- Using Temporal signals to send events from activities to the workflow +- Using Temporal queries to poll events from the workflow to the caller +- Setting up dependencies to pass workflow identification for signal routing +- Integrating MCP toolsets and custom tools with streaming +- Complete project structure with all necessary files + +The example includes a Yahoo Finance search agent with Python code execution capabilities, showing how to stream tool calls, model responses, and results in real-time during workflow execution. + + ## Activity Configuration diff --git a/examples/pydantic_ai_examples/temporal_streaming/README.md b/examples/pydantic_ai_examples/temporal_streaming/README.md new file mode 100644 index 0000000000..ad1aa0fff3 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/README.md @@ -0,0 +1,172 @@ +# Temporal Streaming Example + +This example demonstrates how to implement streaming with Pydantic AI agents in Temporal workflows. It showcases the streaming pattern described in the [Temporal documentation](../../../docs/durable_execution/temporal.md#streaming). + +## Overview + +The example implements a Yahoo Finance search agent that: +- Uses MCP (Model Context Protocol) toolsets for accessing financial data +- Executes Python code in a sandbox for data analysis +- Streams events during execution via Temporal signals and queries +- Provides durable execution with automatic retries + +## Architecture + +The streaming architecture works as follows: + +1. **Agent Configuration** (`agents.py`): Defines the agent with MCP toolsets and custom Python execution tools +2. **Workflow** (`workflow.py`): Temporal workflow that orchestrates agent execution and manages event streams +3. **Streaming Handler** (`streaming_handler.py`): Processes agent events and sends them to the workflow via signals +4. **Main** (`main.py`): Sets up the Temporal client/worker and polls for events via queries + +## Key Components + +### Event Flow + +``` +Agent Execution (in Activity) + ↓ +Streaming Handler + ↓ (via Signal) +Workflow Event Queue + ↓ (via Query) +Main Process (polling) + ↓ +Display to User +``` + +### Dependencies + +The [`AgentDependencies`][pydantic_ai_examples.temporal_streaming.datamodels.AgentDependencies] model passes workflow identification from the workflow to activities, enabling the streaming handler to send signals back to the correct workflow instance. + +## Prerequisites + +1. **Temporal Server**: Install and run Temporal locally + +```bash +brew install temporal +temporal server start-dev +``` + +2. **Python Dependencies**: Install required packages + +```bash +pip install pydantic-ai temporalio mcp-run-python pyyaml +``` + +3. **Configuration File**: Create an `app_conf.yml` file in your project root + +```yaml +llm: + anthropic_api_key: ANTHROPIC_API_KEY # Will be read from environment variable +``` + +4. **Environment Variables**: Set your Anthropic API key + +```bash +export ANTHROPIC_API_KEY='your-api-key-here' +``` + +## Running the Example + +1. Make sure Temporal server is running: + +```bash +temporal server start-dev +``` + +2. Set the configuration file path (optional, defaults to `./app_conf.yml`): + +```bash +export APP_CONFIG_PATH=./app_conf.yml +``` + +3. Run the example: + +```bash +python -m pydantic_ai_examples.temporal_streaming.main +``` + +## What to Expect + +The example will: +1. Connect to Temporal server +2. Start a worker to handle workflows and activities +3. Execute the workflow with a sample financial query +4. Stream events as the agent: + - Calls tools (Yahoo Finance API, Python sandbox) + - Receives responses + - Generates the final result +5. Display all events in real-time +6. Show the final result + +## Project Structure + +``` +temporal_streaming/ +├── agents.py # Agent configuration with MCP toolsets +├── datamodels.py # Pydantic models for dependencies and events +├── main.py # Main entry point +├── streaming_handler.py # Event stream handler +├── utils.py # Configuration utilities +├── workflow.py # Temporal workflow definition +└── README.md # This file +``` + +## Customization + +### Changing the Query + +Edit the query in `main.py`: + +```python +workflow_handle = await client.start_workflow( + YahooFinanceSearchWorkflow.run, + args=['Your custom financial query here'], + id=workflow_id, + task_queue=task_queue, +) +``` + +### Adding More Tools + +Add tools to the agent in `agents.py`: + +```python +@agent.tool(name='your_tool_name') +async def your_tool(ctx: RunContext[AgentDependencies], param: str) -> str: + # Your tool implementation + return result +``` + +### Modifying Event Handling + +Customize what events are captured and displayed in `streaming_handler.py`. + +## Key Concepts + +### Why Streaming is Different in Temporal + +Traditional streaming methods like [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] don't work in Temporal because: +- Activities cannot stream directly to the workflow +- The workflow and activity run in separate processes + +### The Solution + +This example uses: +- **Event Stream Handler**: Captures events during agent execution +- **Signals**: Push events from activities to the workflow +- **Queries**: Pull events from the workflow to the caller +- **Dependencies**: Pass workflow identification to enable signal routing + +## Limitations + +- Events are batched per model request/tool call rather than streamed token-by-token +- Query polling introduces a small delay in event delivery +- The workflow waits up to 10 seconds for events to be consumed before completing + +## Learn More + +- [Temporal Documentation](https://docs.temporal.io/) +- [Pydantic AI Temporal Integration](../../../docs/durable_execution/temporal.md) +- [Streaming with Pydantic AI](../../../docs/agents.md#streaming-all-events) diff --git a/examples/pydantic_ai_examples/temporal_streaming/__init__.py b/examples/pydantic_ai_examples/temporal_streaming/__init__.py new file mode 100644 index 0000000000..dfaff570e3 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/__init__.py @@ -0,0 +1,19 @@ +"""Temporal streaming example for Pydantic AI. + +This example demonstrates how to implement streaming with Pydantic AI agents +in Temporal workflows using signals and queries. +""" + +from .agents import build_agent +from .datamodels import AgentDependencies, EventKind, EventStream +from .streaming_handler import streaming_handler +from .workflow import YahooFinanceSearchWorkflow + +__all__ = [ + 'build_agent', + 'streaming_handler', + 'YahooFinanceSearchWorkflow', + 'AgentDependencies', + 'EventKind', + 'EventStream', +] diff --git a/examples/pydantic_ai_examples/temporal_streaming/agents.py b/examples/pydantic_ai_examples/temporal_streaming/agents.py new file mode 100644 index 0000000000..a22c7d6ba7 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/agents.py @@ -0,0 +1,107 @@ +"""Agent configuration for the Temporal streaming example. + +This module defines the agent setup with MCP toolsets, model configuration, +and custom tools for data analysis. +""" +from datetime import timedelta +from typing import Any + +from temporalio.common import RetryPolicy +from temporalio.workflow import ActivityConfig + +from pydantic_ai import Agent, FilteredToolset, ModelSettings +from pydantic_ai.agent import EventStreamHandler +from pydantic_ai.durable_exec.temporal import TemporalAgent +from pydantic_ai.mcp import MCPServerStdio +from pydantic_ai.models.anthropic import AnthropicModel +from pydantic_ai.providers.anthropic import AnthropicProvider +from .datamodels import AgentDependencies + + +async def get_mcp_toolsets() -> dict[str, FilteredToolset[AgentDependencies]]: + """ + Initialize MCP toolsets for the agent. + + Returns: + A dictionary mapping toolset names to filtered toolsets. + """ + yf_server = MCPServerStdio( + command='uvx', + args=['mcp-yahoo-finance'], + timeout=240, + read_timeout=240, + id='yahoo', + ) + return {'yahoo': yf_server.filtered(lambda ctx, tool_def: True)} + + +async def get_claude_model(parallel_tool_calls: bool = True, **kwargs: Any) -> AnthropicModel: + """ + Create and configure the Claude model. + + Args: + parallel_tool_calls: Whether to enable parallel tool calls. + **kwargs: Environment variables including API keys. + + Returns: + Configured AnthropicModel instance. + """ + model_name: str = 'claude-sonnet-4-5-20250929' + api_key: str | None = kwargs.get('anthropic_api_key', None) + model: AnthropicModel = AnthropicModel( + model_name=model_name, + provider=AnthropicProvider(api_key=api_key), + settings=ModelSettings( + temperature=0.5, + max_tokens=64000, + parallel_tool_calls=parallel_tool_calls, + ), + ) + + return model + + +async def build_agent(stream_handler: EventStreamHandler[AgentDependencies], + **kwargs: Any) -> TemporalAgent[AgentDependencies, str]: + """ + Build and configure the agent with tools and temporal settings. + + Args: + stream_handler: Optional event stream handler for streaming responses. + **kwargs: Environment variables including API keys. + + Returns: + TemporalAgent instance ready for use in Temporal workflows. + """ + system_prompt = """ + You are an expert financial analyst that knows how to search for financial data on the web. + """ + agent_name = 'YahooFinanceSearchAgent' + + toolsets = await get_mcp_toolsets() + agent: Agent[AgentDependencies, str] = Agent[AgentDependencies, str]( + name=agent_name, + model=await get_claude_model(**kwargs), + toolsets=[*toolsets.values()], + system_prompt=system_prompt, + event_stream_handler=stream_handler, + deps_type=AgentDependencies, + ) + + temporal_agent = TemporalAgent( + wrapped=agent, + model_activity_config=ActivityConfig( + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=50), + ), + toolset_activity_config={ + toolset_id: ActivityConfig( + start_to_close_timeout=timedelta(minutes=3), + retry_policy=RetryPolicy( + maximum_attempts=3, non_retryable_error_types=['ToolRetryError'] + ), + ) + for toolset_id in toolsets.keys() + }, + ) + return temporal_agent diff --git a/examples/pydantic_ai_examples/temporal_streaming/app_conf.yml b/examples/pydantic_ai_examples/temporal_streaming/app_conf.yml new file mode 100644 index 0000000000..4aa566303d --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/app_conf.yml @@ -0,0 +1,11 @@ +# Configuration file for the Temporal Streaming example +# +# This file demonstrates how to configure API keys using environment variables. +# The value specified here (e.g., ANTHROPIC_API_KEY) will be replaced with the +# actual environment variable value at runtime. + +llm: + # The anthropic_api_key will be read from the ANTHROPIC_API_KEY environment variable + # Make sure to set it before running the example: + # export ANTHROPIC_API_KEY='your-api-key-here' + anthropic_api_key: 'ANTHROPIC_API_KEY' diff --git a/examples/pydantic_ai_examples/temporal_streaming/datamodels.py b/examples/pydantic_ai_examples/temporal_streaming/datamodels.py new file mode 100644 index 0000000000..ce1557f0b7 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/datamodels.py @@ -0,0 +1,27 @@ +"""Data models for the temporal streaming example.""" + +from enum import Enum + +from pydantic import BaseModel + + +class AgentDependencies(BaseModel): + """Dependencies passed to the agent containing workflow identification.""" + + workflow_id: str + run_id: str + + +class EventKind(str, Enum): + """Types of events that can be streamed.""" + + CONTINUE_CHAT = 'continue_chat' + EVENT = 'event' + RESULT = 'result' + + +class EventStream(BaseModel): + """Event stream data model for streaming agent events.""" + + kind: EventKind + content: str diff --git a/examples/pydantic_ai_examples/temporal_streaming/main.py b/examples/pydantic_ai_examples/temporal_streaming/main.py new file mode 100644 index 0000000000..e24a8ec98a --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/main.py @@ -0,0 +1,106 @@ +"""Main entry point for the Temporal streaming example. + +This module sets up the Temporal client and worker, executes the workflow, +and polls for streaming events to display to the user. +""" + +import asyncio +import os +import uuid +from typing import Any + +from temporalio.client import Client, WorkflowHandle +from temporalio.worker import Worker + +from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin +from .agents import build_agent +from .datamodels import EventKind, EventStream +from .streaming_handler import streaming_handler +from .utils import read_config_yml +from .workflow import YahooFinanceSearchWorkflow + + +async def poll_events(workflow_handle: WorkflowHandle[Any, str]) -> None: + """ + Poll for events from the workflow and print them. + + Args: + workflow_handle: Handle to the running workflow. + """ + while True: + event: EventStream | None = await workflow_handle.query('event_stream', + result_type=EventStream | None) # type: ignore[misc] + if event is None: + await asyncio.sleep(0.1) + continue + + if event.kind == EventKind.CONTINUE_CHAT: + print('\n--- Workflow completed ---') + break + elif event.kind == EventKind.RESULT: + print(f'\n=== Final Result ===\n{event.content}\n') + elif event.kind == EventKind.EVENT: + print(f'\n--- Event ---\n{event.content}\n') + + +async def main() -> None: + """ + Main function to set up and run the Temporal workflow. + + This function: + 1. Connects to the Temporal server + 2. Builds the agent and registers activities + 3. Starts a worker + 4. Executes the workflow + 5. Polls for streaming events + """ + # Connect to Temporal server + client = await Client.connect( + # target_host='localhost:7233', + target_host='localhost:7233', + plugins=[PydanticAIPlugin()], + ) + config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') + confs = read_config_yml(config_path) + + # Build the agent with streaming handler + temporal_agent = await build_agent(streaming_handler, **confs['llm']) + + # Define the task queue + task_queue = 'yahoo-finance-search' + + # Start the worker + async with Worker( + client, + task_queue=task_queue, + workflows=[YahooFinanceSearchWorkflow], + activities=[YahooFinanceSearchWorkflow.retrieve_env_vars], + plugins=[AgentPlugin(temporal_agent)], + ): + # Execute the workflow + workflow_id = f'yahoo-finance-search-{uuid.uuid4()}' + workflow_handle: WorkflowHandle[Any, str] = await client.start_workflow( # type: ignore[misc] + 'YahooFinanceSearchWorkflow', + arg='What are the latest financial metrics for Apple (AAPL)?', + id=workflow_id, + task_queue=task_queue, + result_type=str + ) + + print(f'Started workflow with ID: {workflow_id}') + print('Polling for events...\n') + + # Poll for events in the background + event_task = asyncio.create_task(poll_events(workflow_handle)) + + # Wait for workflow to complete + result = await workflow_handle.result() + + # Ensure event polling is done + await event_task + + print(f'\nWorkflow completed with result: {result}') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py new file mode 100644 index 0000000000..18fd830a1e --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py @@ -0,0 +1,98 @@ +"""Event streaming handler for the Temporal workflow. + +This module handles streaming events from the agent during execution, +processing various event types and sending them to the workflow via signals. +""" + +from collections.abc import AsyncIterable + +from temporalio import activity +from temporalio.client import WorkflowHandle + +from pydantic_ai import ( + AgentStreamEvent, + FunctionToolCallEvent, + FunctionToolResultEvent, + PartDeltaEvent, + PartStartEvent, + TextPart, + TextPartDelta, + ThinkingPartDelta, + ToolCallPart, RunContext, +) +from .datamodels import AgentDependencies, EventKind, EventStream + + +async def streaming_handler( + ctx: RunContext[AgentDependencies], + event_stream_events: AsyncIterable[AgentStreamEvent], +) -> None: + """ + Handle streaming events from the agent. + + This function processes events from the agent's execution stream, including + tool calls, LLM responses, and streaming results. It aggregates the events + and sends them to the workflow via signals. + + Args: + ctx: The run context containing dependencies. + event_stream_events: Async iterable of agent stream events. + """ + if not activity.in_activity(): + return + + output: str = '' + output_tool_delta: dict[str, str] = dict( + tool_call_id='', + tool_name_delta='', + args_delta='', + ) + + # Process all events from the stream + async for event in event_stream_events: + if isinstance(event, PartStartEvent): + if isinstance(event.part, TextPart): + output += f'{event.part.content}' + elif isinstance(event.part, ToolCallPart): + output += f'\nTool Call Id: {event.part.tool_call_id}' + output += f'\nTool Name: {event.part.tool_name}' + output += f'\nTool Args: {event.part.args}' + else: + pass + elif isinstance(event, FunctionToolCallEvent): + output += f'\nTool Call Id: {event.part.tool_call_id}' + output += f'\nTool Name: {event.part.tool_name}' + output += f'\nTool Args: {event.part.args}' + elif isinstance(event, FunctionToolResultEvent): + output += f'\nTool Call Id: {event.result.tool_call_id}' + output += f'\nTool Name: {event.result.tool_name}' + output += f'\nContent: {event.result.content}' + elif isinstance(event, PartDeltaEvent): + if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta): + output += f'{event.delta.content_delta}' + else: + if len(output_tool_delta['tool_call_id']) == 0: + output_tool_delta['tool_call_id'] += event.delta.tool_call_id or '' + output_tool_delta['tool_name_delta'] += event.delta.tool_name_delta or '' + # Handle args_delta which can be str or dict + args_delta = event.delta.args_delta + if isinstance(args_delta, str): + output_tool_delta['args_delta'] += args_delta + elif isinstance(args_delta, dict): + output_tool_delta['args_delta'] += str(args_delta) + + # Add accumulated tool delta output if present + if len(output_tool_delta['tool_call_id']): + output += f'\nTool Call Id: {output_tool_delta["tool_call_id"]}' + output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}' + args_delta_str = str(output_tool_delta["args_delta"]) + output += f'\nTool Args: {args_delta_str}' + + # Send events to workflow if running in an activity + deps: AgentDependencies = ctx.deps + + workflow_id: str = deps.workflow_id + run_id: str = deps.run_id + from typing import Any + workflow_handle: WorkflowHandle[Any, Any] = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id) # type: ignore[misc] + await workflow_handle.signal('append_event', arg=EventStream(kind=EventKind.EVENT, content=output)) diff --git a/examples/pydantic_ai_examples/temporal_streaming/utils.py b/examples/pydantic_ai_examples/temporal_streaming/utils.py new file mode 100644 index 0000000000..95c2a3797d --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/utils.py @@ -0,0 +1,62 @@ +"""Utility functions for configuration management.""" + +import os +from copy import copy +from typing import Any + +import yaml + + +def recursively_modify_api_key(conf: dict[str, Any]) -> dict[str, Any]: + """ + Recursively replace API key placeholders with environment variable values. + + This function traverses a configuration dictionary and replaces any keys + containing 'api_key' with the corresponding environment variable value. + It handles nested dictionaries and lists recursively. + + Args: + conf: The configuration dictionary to process. + + Returns: + A copy of the configuration with API keys replaced by environment variable values. + """ + + def inner(_conf: dict[str, Any]) -> None: + for key, value in _conf.items(): + if isinstance(value, dict): + inner(value) # type: ignore[arg-type] + elif isinstance(value, list): + if len(value) > 0 and isinstance(value[0], dict): # type: ignore[misc] + item: dict[str, Any] + for item in value: # type: ignore[assignment,misc] + inner(item) # type: ignore[arg-type] + else: + _conf[key] = [str(os.environ.get(str(v), v)) for v in value] # type: ignore[misc] + elif isinstance(value, str): + _conf[key] = os.environ.get(value, value) + else: + _conf[key] = value + + copy_conf = copy(conf) + inner(copy_conf) + return copy_conf + + +def read_config_yml(path: str) -> dict[str, Any]: + """ + Read and process a YAML configuration file. + + This function reads a YAML file, processes it to replace API key placeholders + with environment variable values, and returns the processed configuration. + + Args: + path: The path to the YAML configuration file. + + Returns: + dict: The parsed and processed YAML content as a Python dictionary. + """ + with open(path) as f: + configs = yaml.safe_load(f) + recursively_modify_api_key(configs) + return configs diff --git a/examples/pydantic_ai_examples/temporal_streaming/workflow.py b/examples/pydantic_ai_examples/temporal_streaming/workflow.py new file mode 100644 index 0000000000..cfd19aaa80 --- /dev/null +++ b/examples/pydantic_ai_examples/temporal_streaming/workflow.py @@ -0,0 +1,114 @@ +"""Temporal workflow for streaming agent execution. + +This module defines the Temporal workflow that orchestrates the agent execution +with streaming capabilities, using signals and queries for event communication. +""" + +import asyncio +import os +from collections import deque +from datetime import timedelta +from typing import Any + +from temporalio import activity, workflow + +from pydantic_ai import UsageLimits +from .agents import build_agent +from .datamodels import AgentDependencies, EventKind, EventStream +from .streaming_handler import streaming_handler +from .utils import read_config_yml + + +@workflow.defn +class YahooFinanceSearchWorkflow: + """ + Temporal workflow for executing the Yahoo Finance search agent with streaming. + + This workflow manages the agent execution, collects streaming events via signals, + and exposes them through queries for consumption by external systems. + """ + + def __init__(self): + """Initialize the workflow with an empty event queue.""" + self.events: deque[EventStream] = deque() + + @workflow.run + async def run(self, user_prompt: str) -> str: + """ + Execute the agent with the given user prompt. + + Args: + user_prompt: The user's question or request. + + Returns: + The agent's final output. + """ + # Retrieve environment variables from configuration + wf_vars = await workflow.execute_activity( # type: ignore[misc] + activity='retrieve_env_vars', + start_to_close_timeout=timedelta(seconds=10), + result_type=dict[str, Any], + ) + + # Create dependencies with workflow identification for signal routing + deps = AgentDependencies(workflow_id=workflow.info().workflow_id, run_id=workflow.info().run_id) + + # Build and run the agent + agent = await build_agent(streaming_handler, **wf_vars) + result = await agent.run( + user_prompt=user_prompt, usage_limits=UsageLimits(request_limit=50), deps=deps + ) + + # Signal the final result + await self.append_event(event_stream=EventStream(kind=EventKind.RESULT, content=result.output)) + + # Signal completion + await self.append_event(event_stream=EventStream(kind=EventKind.CONTINUE_CHAT, content='')) + + # Wait for events to be consumed before completing + try: + await workflow.wait_condition( + lambda: len(self.events) == 0, + timeout=timedelta(seconds=10), + timeout_summary='Waiting for events to be consumed', + ) + return result.output + except asyncio.TimeoutError: + return result.output + + @staticmethod + @activity.defn(name='retrieve_env_vars') + async def retrieve_env_vars() -> dict[str, Any]: + """ + Retrieve environment variables from configuration file. + + Returns: + Dictionary containing API keys and other configuration. + """ + config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml') + configs = read_config_yml(config_path) + return {'anthropic_api_key': configs['llm']['anthropic_api_key']} + + @workflow.query + def event_stream(self) -> EventStream | None: + """ + Query to retrieve the next event from the stream. + + Returns: + The next event if available, None otherwise. + """ + if self.events: + return self.events.popleft() + return None + + @workflow.signal + async def append_event(self, event_stream: EventStream): + """ + Signal to append a new event to the stream. + + This is called by the streaming handler running in activities. + + Args: + event_stream: The event to append. + """ + self.events.append(event_stream)