Skip to content

Issue after integrating dispatch: agent not connecting to room #470

@yosepppph

Description

@yosepppph

After integrating dispatch, I’m encountering two issues:

1)The participant_metadata_changed event no longer seems to trigger.

2)More critically, the agent is no longer able to connect to the room.

I’ve followed the documentation for setting up dispatch, but based on the logs and what I see in the LiveKit console, I’m unable to determine where the failure is occurring.

Could someone help clarify what might be going wrong or suggest how to debug this connection issue further?

Here is the code

to generated access token

app.post("/get-token", async (req, res) => {
  try {
    const { identity, roomName } = req.body;

    if (!identity || !roomName) {
      return res
        .status(400)
        .json({ error: "Identity and room name are required" });
    }

    // Create LiveKit access token
    const at = new AccessToken(
      process.env.LIVEKIT_API_KEY,
      process.env.LIVEKIT_API_SECRET,
      {
        identity,
        name: identity,
      },
    );

    // Create room if it doesn't exist
    // try {
    //   await roomService.createRoom({
    //     name: roomName,
    //     emptyTimeout: 60 * 60, // 1 hour in seconds
    //     maxParticipants: 4, // User, agent, and potential observers
    //   });
    //   console.log(`Room ${roomName} created`);
    // } catch (error) {
    //  console.log("Room might already exist:", error.message);
    //}

    // Add room-specific grants
    at.addGrant({
      roomJoin: true,
      room: roomName,
      canPublish: true,
      canSubscribe: true,
      // Enable data channel for agent communications
      canPublishData: true,
      // Add permission to update metadata
      canUpdateOwnMetadata: true,
    });

    //Add room configuration with agent dispatch
    // at.roomConfig = new RoomConfiguration({
    //   agents: [
    //     new RoomAgentDispatch({
    //       agentName: "smart-shopping-assistant",
    //     }),
    //   ],
    // });
    const agentName = "smart-shopping-assistant";
    at.roomConfig = new RoomConfiguration({
      agents: [
        new RoomAgentDispatch({
          agent_name: agentName,
          metadata: '{"user_id": "12345"}',
        }),
      ],
    });

    const token = await at.toJwt();
    console.log("Token:", token);
    res.json({ token, url: process.env.LIVEKIT_WS_URL });
  } catch (error) {
    console.error("Error generating token:", error);
    res.status(500).json({ error: "Failed to generate token" });
  }
});

Server side agent code

from dotenv import load_dotenv
import os
import json
from typing import Literal, Optional, Any
from livekit import agents, rtc
from livekit.agents import AgentSession, Agent, RoomInputOptions, function_tool, JobContext
from livekit.plugins import openai
from pydantic import BaseModel, ConfigDict
import asyncio

load_dotenv()

LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
LIVEKIT_WS_URL = os.getenv("LIVEKIT_URL")

if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_WS_URL:
    print("Missing LiveKit credentials in .env file")
    exit(1)

print("Using LiveKit server:", LIVEKIT_WS_URL)


async def smart_shopping_request_fnc(req: agents.JobRequest):
    print('++++++++++++++++++++++++++')
    print('Smart shopping request received in request_fnc')
    print('++++++++++++++++++++++++++')
    
    # For now, accept all requests
    # In a real implementation, you could check req.metadata or other context
    # to determine if the user wants shopping assistance
    await req.accept(
        name="smart-shopping-assistant",
        #identity="smart-shopping-agent",
    )


class ShoppingAssistant(Agent):
    def __init__(self,ctx, shopping_list):
        self.ctx = ctx
        # Enhance once at initialization
        self.shopping_list = shopping_list
        super().__init__(
            instructions=self._make_system_prompt(self.shopping_list),
        )

    def _make_system_prompt(self, shopping_list):
        # shopping_list is already enhanced, no conversion needed
        pending_items = [item["item"] for item in shopping_list if item["picked_at"] == "pending"]
        picked_items = [item["item"] for item in shopping_list if item["picked_at"] == "picked"]
        already_home_items = [item["item"] for item in shopping_list if item["picked_at"] == "already_home"]
        
        return (
            "You are a helpful shopping assistant. The user is at a store with a shopping list.\n"
            "Help them find alternatives, suggest related items, or answer questions about the ingredients.\n\n"
            f"Current shopping list: {json.dumps(shopping_list)}\n\n"
            f"Pending items: {pending_items}\n"
            f"Picked items: {picked_items}\n\n"
            f"Already home items: {already_home_items}\n\n"
            "You can manage the shopping list using these tools:\n"
            '- To add an item: update_shopping_list("add", "milk")\n'
            '- To remove an item: update_shopping_list("remove", "milk")\n'
            '- To mark as picked: mark_item_status("milk", "picked")\n'
            '- To mark as already home: mark_item_status("milk", "already_home")\n'
            '- To get summary: get_shopping_summary()\n\n'
            "Always be polite, helpful, and provide concise information about foods, ingredients, "
            "and cooking tips related to the items in their shopping list."
        )
   
    async def on_participant_connected(self, participant, session):
        print("Participant connected:", participant.identity)
        if "assistant" not in participant.identity:
            print("Real participant joined, sending welcome message")
            await session.generate_reply(
                instructions="Welcome to your personal shopping assistant! I can help you with your shopping list. What are you looking for today?"
            )
        else:
            print("AI Assistant joined the room")

    @function_tool(name="update_shopping_list")
    async def update_shopping_list(
        self, 
        action: Literal["add", "remove", "clear"],
        item: Optional[str] = None
    ) -> str:
        new_list = self.shopping_list.copy()
        
        if action == "add" and item:
            # Check if item already exists (case-insensitive)
            existing_items = [i["item"].lower() for i in new_list]
            if item.lower() not in existing_items:
                # Add as enhanced object
                new_list.append({
                    "item": item,
                    "status": "pending",
                    "picked_at": None,
                    "notes": ""
                })
        elif action == "remove" and item:
            new_list = [i for i in new_list if i["item"].lower() != item.lower()]
        elif action == "clear":
            new_list = []

        self.shopping_list = new_list  # Already enhanced

        # Update metadata
        await self.ctx.room.local_participant.set_metadata(json.dumps({"shoppingList": new_list}))

        # Update system prompt using the proper method
        await self.update_instructions(self._make_system_prompt(new_list))

        # Optionally notify the user
        #await session.generate_reply(
        #    instructions=f"I've updated your shopping list. You now have {len(new_list)} items."
        #)
        return f"Shopping list updated. Current items: {new_list}"
    
    async def on_data_received(self, payload, participant, ctx, session):
        try:
            data = json.loads(payload.decode())
            if data.get("type") == "shopping_list" and isinstance(data.get("data"), list):
                self.shopping_list = data["data"]
                await self.update_instructions(self._make_system_prompt(self.shopping_list))
                await session.generate_reply(
                    instructions=f"I've updated your shopping list. You now have {len(self.shopping_list)} items."
                )
        except Exception as e:
            print("Error processing data channel message:", e)

    def _convert_simple_list_to_enhanced(self, simple_list):
        """Convert simple string list to enhanced object list"""
        if not simple_list:
            return []
        
        # Check if already enhanced format
        if isinstance(simple_list[0], dict):
            return simple_list
        
        # Convert simple strings to enhanced objects
        return [
            {
                "item": item,
                "status": "pending",
                "picked_at": None,
                "notes": ""
            }
            for item in simple_list
        ]

    def _get_pending_items(self):
        """Get only pending items"""
        return [item for item in self.shopping_list if item.get("status") == "pending"]

    def _get_picked_items(self):
        """Get only picked items"""
        return [item for item in self.shopping_list if item.get("status") == "picked"]

    def _get_already_home_items(self):
        """Get only items marked as at home"""
        return [item for item in self.shopping_list if item.get("status") == "already_home"]

    @function_tool(name="mark_item_status")
    async def mark_item_status(
        self,
        item_name: str,
        status: Literal["pending", "picked", "already_home"],
        notes: Optional[str] = None
    ) -> str:
        """Mark an item as picked, at home, or pending"""
        from datetime import datetime
        
        # Find the item
        for item in self.shopping_list:
            if item["item"].lower() == item_name.lower():
                item["status"] = status
                if status == "picked":
                    item["picked_at"] = datetime.now().isoformat()
                if notes:
                    item["notes"] = notes
                
                # Update metadata
                await self.ctx.room.local_participant.set_metadata(
                    json.dumps({"shoppingList": self.shopping_list})
                )
                
                return f"Marked '{item_name}' as {status}"
        
        return f"Item '{item_name}' not found in shopping list"

    @function_tool(name="get_shopping_summary")
    async def get_shopping_summary(self) -> str:
        """Get a summary of shopping list status"""
        pending = len(self._get_pending_items())
        picked = len(self._get_picked_items())
        already_home = len(self._get_already_home_items())
        total = len(self.shopping_list)
        
        return f"Shopping Summary: {pending} pending, {picked} picked, {already_home} already home (Total: {total})"

async def entrypoint(ctx: agents.JobContext):
    await ctx.connect()
    
    # Get shopping list from room metadata if it exists
    shopping_list = []
    shopping_assistant = ShoppingAssistant(ctx, shopping_list)

    def handle_metadata_changed(participant: rtc.Participant, old_metadata: str, new_metadata: str):
        print("handle_metadata_changed")
        async def _handle_metadata():
            try:
                metadata = json.loads(participant.metadata or "{}")
                print("metadata",metadata)
                if "shoppingList" in metadata and isinstance(metadata["shoppingList"], list):
                    new_shopping_list =  metadata["shoppingList"]
                    print("new_shopping_list",new_shopping_list)
                    if new_shopping_list != shopping_assistant.shopping_list:
                        shopping_assistant.shopping_list = new_shopping_list
                        await shopping_assistant.update_instructions(shopping_assistant._make_system_prompt(shopping_assistant.shopping_list))
            except Exception as e:
                print("Error parsing participant metadata:", e)
        
        asyncio.create_task(_handle_metadata())

    ctx.room.on("participant_metadata_changed", handle_metadata_changed)

    # Listen for events before connecting to the room
    @ctx.room.on("participant_connected")
    def on_participant_connected(participant):
        print(f"Participant connected: {participant.identity}")

    # ctx.room.on("participant_metadata_changed", handle_metadata_changed)
    # async def handle_metadata_changed(participant: rtc.Participant, old_metadata: str, new_metadata: str):
    #     print("handle_metadata_changed")
    #     try:
    #         metadata = json.loads(participant.metadata or "{}")
    #         print("metadata",metadata)
    #         if "shoppingList" in metadata and isinstance(metadata["shoppingList"], list):
    #             new_shopping_list =  metadata["shoppingList"]
    #             print("new_shopping_list",new_shopping_list)
    #             if new_shopping_list != shopping_assistant.shopping_list:
    #                 shopping_assistant.shopping_list = new_shopping_list
    #                 await shopping_assistant.update_instructions(shopping_assistant._make_system_prompt(shopping_assistant.shopping_list))
    #     except Exception as e:
    #         print("Error parsing participant metadata:", e)
        

    
    
    
    session = AgentSession(
        llm=openai.realtime.RealtimeModel(
            voice="coral",
            model="gpt-4o-realtime-preview",
        # stt=deepgram.STT(),
        # llm=openai.LLM(model="gpt-4o"),
        # tts=openai.TTS(
        #     instructions="You are a helpful assistant with a pleasant voice.",
        #     voice="ash",
        # ),
        # vad=silero.VAD.load(),
        )
    )

  

    await session.start(
        room=ctx.room,
        agent=shopping_assistant,
        room_input_options=RoomInputOptions(),
    )



    

    # async def handle_metadata_changed(participant, prev_metadata):
    #     await shopping_assistant.on_metadata_changed(participant, prev_metadata, ctx, session)

    # ctx.room.on("participant_metadata_changed", handle_metadata_changed)

      # Register event handlers
    # ctx.room.on("participant_connected", lambda participant: shopping_assistant.on_participant_connected(participant, session))
    # ctx.room.on("participant_metadata_changed", lambda participant, prev_metadata: shopping_assistant.on_metadata_changed(participant, prev_metadata, ctx, session))
    # ctx.room.on("data_received", lambda payload, participant: shopping_assistant.on_data_received(payload, participant, ctx, session))

    

    # Wait for a participant to join
    participant = await ctx.wait_for_participant()
    print(f"Starting shopping assistant agent for {participant.identity}")

    # # Initial greeting
    # await session.generate_reply(
    #     instructions="Greet the user and offer your assistance."
    # )

if __name__ == "__main__":
    agents.cli.run_app(agents.WorkerOptions(
        entrypoint_fnc=entrypoint,
        # request_fnc=smart_shopping_request_fnc,
        agent_name="smart-shopping-assistant"
    ))

Can you help identify which part of this code might be incorrect or causing the issue?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions