Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming the agent team's response #5625

Open
surendransuri opened this issue Feb 20, 2025 · 4 comments
Open

Streaming the agent team's response #5625

surendransuri opened this issue Feb 20, 2025 · 4 comments
Labels
question Further information is requested

Comments

@surendransuri
Copy link

surendransuri commented Feb 20, 2025

What feature would you like to be added?

How to stream the run_stream output to the front end. I am using this Group Chat Agent in my Azure functions. How to send the streaming output from the function app.

I have tried the 'azurefunctions-extensions-http-fastapi' method, but it is failing due to version conflicts. Please help with this to overcome the issue.


async def http_trigger1(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.1')
 
    task = req.params.get('task')
    if not task:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            task = req_body.get('name')
    logging.info('Python HTTP trigger function processed a request.12')
    if task:
        try:
            logging.info('Python HTTP trigger function processed a request.13')
            user_input=task
 
                # Get the team and respond to the message.
            agent_team = await get_selector_group_chat_team()
            result_stream = agent_team.run_stream(task=user_input, cancellation_token=cancellation_token)
            # Print each message as it comes.        
             async for msg in result_stream:                
                 if isinstance(msg, TaskResult):                    
                    continue
                #return func.HttpResponse(msg.model_dump())
                await save_agent_state("123-456", "123", agent_team)        
        except Exception as e:
            print(f"Error: {e}")
            return func.HttpResponse(json.dumps({"Error": {e}}),media_type="Application/json")    
    else:
        return func.HttpResponse(
             "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
             s

Why is this needed?

For production deployment

@jackgerrits jackgerrits added the question Further information is requested label Feb 20, 2025
@jackgerrits
Copy link
Member

You can find the docs describing token streaming here: https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/agents.html#streaming-tokens

Essentially, you need to write logic that will consume the stream result_stream and stream this to wherever you need this to go.

@jackgerrits jackgerrits added awaiting-op-response Issue or pr has been triaged or responded to and is now awaiting a reply from the original poster and removed needs-triage labels Feb 20, 2025
@surendransuri
Copy link
Author

Please share the sample code for this

@github-actions github-actions bot removed the awaiting-op-response Issue or pr has been triaged or responded to and is now awaiting a reply from the original poster label Feb 20, 2025
@jackgerrits
Copy link
Member

Please see the link I sent:

model_client = OpenAIChatCompletionClient(model="gpt-4o")

streaming_assistant = AssistantAgent(
    name="assistant",
    model_client=model_client,
    system_message="You are a helpful assistant.",
    model_client_stream=True,  # Enable streaming tokens.
)

# Use an async function and asyncio.run() in a script.
async for message in streaming_assistant.on_messages_stream(  # type: ignore
    [TextMessage(content="Name two cities in South America", source="user")],
    cancellation_token=CancellationToken(),
):
    print(message)
source='assistant' models_usage=None content='Two' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' cities' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' in' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' South' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' America' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' are' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' Buenos' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' Aires' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' in' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' Argentina' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' and' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' São' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' Paulo' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' in' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content=' Brazil' type='ModelClientStreamingChunkEvent'
source='assistant' models_usage=None content='.' type='ModelClientStreamingChunkEvent'
Response(chat_message=TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=0, completion_tokens=0), content='Two cities in South America are Buenos Aires in Argentina and São Paulo in Brazil.', type='TextMessage'), inner_messages=[])

@victordibia
Copy link
Collaborator

victordibia commented Feb 20, 2025

@surendransuri ,

Use Streams and not Http Trigger

From the code I see above, it seems you are trying to stream output over a http connection in an Azure function. HTTP is stateless and not suitable for streaming.

Now I found this documentation on supporting http streams in azure python. Please read it .

It seems that your code should look like this

import azure.functions as func
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

async def stream_messages(result_stream):
    """Convert agent messages to SSE format"""
    async for msg in result_stream:
        if isinstance(msg, TaskResult):
            continue
        # Format the message as SSE data
        yield f"data: {msg.model_dump()}\n\n"

@app.route(route="chat", methods=[func.HttpMethod.POST])
async def chat_stream(req: Request) -> StreamingResponse:
    """Endpoint to stream chat responses"""
    task = req.query_params.get('task') or (await req.json()).get('task')
    
    agent_team = await get_selector_group_chat_team()
    result_stream = agent_team.run_stream(task=task, cancellation_token=cancellation_token)
    
    return StreamingResponse(
        stream_messages(result_stream),
        media_type="text/event-stream"
    )

Please see our Examples on Streaming to Front end

We have sevaral examples that can help build an understanding of using streams in a frontend app


Note: You will need to adapt it to fit the streamed data. See the fastapi, streamlit and chainlit examples for guidance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants