Skip to content
7 changes: 5 additions & 2 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,9 @@ async def _image_to_video_processors(self, track_id: str, track_type: int):
async def _on_track_removed(
self, track_id: str, track_type: int, participant: Participant
):
self._active_video_tracks.pop(track_id)
await self._on_track_change(track_id)
track = self._active_video_tracks.pop(track_id, None)
if track is not None:
await self._on_track_change(track_id)

async def _on_track_change(self, track_id: str):
# shared logic between track remove and added
Expand All @@ -972,6 +973,8 @@ async def _on_track_change(self, track_id: str):
non_processed_tracks = [
t for t in self._active_video_tracks.values() if not t.processor
]
if not non_processed_tracks:
return
source_track = sorted(
non_processed_tracks, key=lambda t: t.priority, reverse=True
)[0]
Expand Down
1 change: 0 additions & 1 deletion agents-core/vision_agents/core/utils/audio_forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ async def start(self) -> None:
logger.warning("AudioForwarder already started")
return
self._task = asyncio.create_task(self._reader())
logger.info("AudioForwarder started")

async def stop(self) -> None:
"""Stop forwarding audio frames."""
Expand Down
2 changes: 1 addition & 1 deletion plugins/openai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ requires-python = ">=3.10"
license = "MIT"
dependencies = [
"vision-agents",
"openai[realtime]>=2.5.0",
"openai[realtime]>=2.7.2",
]

[project.urls]
Expand Down
3 changes: 3 additions & 0 deletions plugins/openai/tests/test_openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ async def realtime(self):
model="gpt-realtime",
voice="alloy",
)
realtime._set_instructions(
"be friendly"
)
try:
yield realtime
finally:
Expand Down
130 changes: 80 additions & 50 deletions plugins/openai/vision_agents/plugins/openai/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from typing import Any, Optional, List, Dict

import aiortc
from openai import AsyncOpenAI
from openai.types.beta.realtime import RateLimitsUpdatedEvent
from openai.types.realtime import RealtimeSessionCreateRequestParam, RealtimeAudioConfigParam, \
RealtimeAudioConfigOutputParam, RealtimeAudioConfigInputParam, AudioTranscriptionParam
from openai.types.realtime.realtime_transcription_session_audio_input_turn_detection_param import SemanticVad

from getstream.video.rtc import AudioStreamTrack
from openai.types.realtime import (
RealtimeSessionCreateRequestParam,
from openai.types.beta.realtime import (
ResponseAudioTranscriptDoneEvent,
InputAudioBufferSpeechStartedEvent,
ConversationItemInputAudioTranscriptionCompletedEvent,
ResponseDoneEvent,
ResponseDoneEvent, SessionCreatedEvent,
)

from vision_agents.core.llm import realtime
Expand All @@ -26,21 +31,12 @@

logger = logging.getLogger(__name__)


"""
TODO
- improve event parsing, reuse built-in types where possible
- not clear why we resize video to width=640, height=480
- reconnect flow isn't implemented properly
- is it needed to redo the SDP offer/answer cycle when receiving a new track?
- support passing full client options in __init__
- review either SessionCreateParams or RealtimeSessionCreateRequestParam
- send video should depend on if the RTC connection with stream is sending video. not always send
TODO: Future improvements
- send video should depend on if the RTC connection with stream is sending video. not always send (requires SDP renegotiation)
- more testing with adding/removing video tracks
"""

client = RealtimeSessionCreateRequestParam


class Realtime(realtime.Realtime):
"""
OpenAI Realtime API implementation for real-time AI audio and video communication over WebRTC.
Expand All @@ -52,7 +48,10 @@ class Realtime(realtime.Realtime):
Args:
model: OpenAI model to use (e.g., "gpt-realtime").
voice: Voice for audio responses (e.g., "marin", "alloy").
send_video: Enable video streaming capabilities. Defaults to False.
realtime_session: Configure RealtimeSessionCreateRequestParam

api_key: Optionally specify an API key
client: pass your own AsyncOpenAI client

This class uses:
- RTCManager to handle WebRTC connection and media streaming.
Expand All @@ -63,20 +62,49 @@ class Realtime(realtime.Realtime):
"""

def __init__(
self, model: str = "gpt-realtime", voice: str = "marin", *args, **kwargs
self, model: str = "gpt-realtime", api_key: Optional[str] = None, voice: str = "marin", client: Optional[AsyncOpenAI] = None, fps: int = 1,
realtime_session: Optional[RealtimeSessionCreateRequestParam] = None
):
super().__init__(*args, **kwargs)
super().__init__(fps)
self.model = model
self.voice = voice
# TODO: send video should depend on if the RTC connection with stream is sending video.
self.rtc = RTCManager(self.model, self.voice, True)

self.realtime_session: RealtimeSessionCreateRequestParam = realtime_session or RealtimeSessionCreateRequestParam(
type="realtime"
)
self.realtime_session["model"] = self.model

# Set audio and output if they are None
# TODO: handle more edge cases of updating the passed settings/ good defaults
if self.realtime_session.get("audio") is None:
self.realtime_session["audio"] = RealtimeAudioConfigParam(
input=RealtimeAudioConfigInputParam(
transcription=AudioTranscriptionParam(model="gpt-4o-mini-transcribe"),
turn_detection=SemanticVad(type="semantic_vad")

))
if self.realtime_session["audio"].get("output") is None:
self.realtime_session["audio"]["output"] = RealtimeAudioConfigOutputParam()
self.realtime_session["audio"]["output"]["voice"] = self.voice

self._output_audio_track = AudioStreamTrack(
sample_rate=48000, channels=2, format="s16"
)
# Map conversation item_id to participant to handle multi-user scenarios
self._item_to_participant: Dict[str, Participant] = {}
self._pending_participant: Optional[Participant] = None

# create the client
if client is not None:
self.client = client
elif api_key is not None and api_key != "":
self.client = AsyncOpenAI(api_key=api_key)
else:
self.client = AsyncOpenAI() # will get it from the env vars

# Start the realtime connection manager
self.rtc = RTCManager(realtime_session=self.realtime_session, client=self.client)

@property
def output_audio_track(self) -> AudioStreamTrack:
return self._output_audio_track
Expand All @@ -87,14 +115,6 @@ async def connect(self):
Sets up callbacks and connects to OpenAI's servers. Emits connected event
with session configuration when ready.
"""
instructions: Optional[str] = None
if hasattr(self, "parsed_instructions") and self.parsed_instructions:
instructions = self._build_enhanced_instructions()
elif getattr(self, "instructions", None):
instructions = self.instructions

self.rtc.instructions = instructions

# Wire callbacks so we can emit audio/events upstream
self.rtc.set_event_callback(self._handle_openai_event)
self.rtc.set_audio_callback(self._handle_audio_output)
Expand Down Expand Up @@ -147,13 +167,6 @@ async def simple_audio_response(
self._pending_participant = participant
await self.rtc.send_audio_pcm(audio)

async def request_session_info(self) -> None:
"""Request session information from the OpenAI API.

Delegates to the RTC manager to query session metadata.
"""
await self.rtc.request_session_info()

async def close(self):
await self.rtc.close()

Expand All @@ -175,18 +188,18 @@ async def _handle_openai_event(self, event: dict) -> None:
Registered as callback with RTC manager.
"""
et = event.get("type")
logger.debug(f"OpenAI Realtime event: {et}")

# code here is weird because OpenAI does something strange
# see issue: https://github.com/openai/openai-python/issues/2698
# as a workaround we copy the event and set type to response.output_audio_transcript.done so that
# as a workaround we copy the event and normalize the type to response.audio_transcript.done so that
# ResponseAudioTranscriptDoneEvent.model_validate is happy
if et in [
"response.audio_transcript.done",
"response.output_audio_transcript.done",
]:
# Create a copy and normalize the type field
event_copy = event.copy()
event_copy["type"] = "response.output_audio_transcript.done"
event_copy["type"] = "response.audio_transcript.done"
transcript_event: ResponseAudioTranscriptDoneEvent = (
ResponseAudioTranscriptDoneEvent.model_validate(event_copy)
)
Expand Down Expand Up @@ -218,14 +231,6 @@ async def _handle_openai_event(self, event: dict) -> None:

# Look up the correct participant for this transcription
participant = self._item_to_participant.get(item_id)
if participant:
logger.info(
f"User speech transcript from {participant.user_id}: {user_transcript_event.transcript}"
)
else:
logger.info(
f"User speech transcript (no participant mapping): {user_transcript_event.transcript}"
)

# Temporarily set the correct participant for this specific transcription
original_participant = self._current_participant
Expand All @@ -252,15 +257,36 @@ async def _handle_openai_event(self, event: dict) -> None:
await self._handle_tool_call_event(event)
elif et == "response.created":
pass
elif et == "session.created":
SessionCreatedEvent(**event)
elif et == "rate_limits.updated":
RateLimitsUpdatedEvent(**event)
elif et == "response.done":
logger.info("OpenAI response done %s", event)
e = ResponseDoneEvent(**event)
response_done_event = ResponseDoneEvent.model_validate(event)

if e.response.status == "failed":
raise Exception("OpenAI realtime failure %s", e.response)
if response_done_event.response.status == "failed":
raise Exception("OpenAI realtime failure %s", response_done_event.response)
Comment on lines +260 to +268
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix the exception format string.

Line 268 uses %s format placeholder but doesn't apply the % operator, so it will print the literal string "OpenAI realtime failure %s" rather than interpolating the response object.

Apply this diff:

-            if response_done_event.response.status == "failed":
-                raise Exception("OpenAI realtime failure %s", response_done_event.response)
+            if response_done_event.response.status == "failed":
+                raise Exception(f"OpenAI realtime failure {response_done_event.response}")
🤖 Prompt for AI Agents
In plugins/openai/vision_agents/plugins/openai/openai_realtime.py around lines
260 to 268, the exception raised for a failed response uses a format placeholder
without applying it, resulting in the literal string being raised; replace the
current raise Exception("OpenAI realtime failure %s",
response_done_event.response) with a single string that interpolates the
response (for example using an f-string or .format) so the response object is
included in the Exception message and remove the comma to avoid creating a
tuple.

elif et == "session.updated":
pass
# e = SessionUpdatedEvent(**event)
elif et == "response.content_part.added":
# Content part added to response - logged for debugging
pass
elif et == "response.audio_transcript.delta":
# Streaming transcript delta - logged at debug level to avoid clutter
pass
elif et == "output_audio_buffer.started":
# Output audio buffer started - acknowledgment of audio playback start
pass
elif et == "response.audio.done":
# Audio generation complete for this response item
pass
elif et == "response.content_part.done":
# Content part complete - contains full transcript
pass
elif et == "response.output_item.done":
# Output item complete - logged for debugging
pass
else:
logger.info(f"Unrecognized OpenAI Realtime event: {et} {event}")

Expand Down Expand Up @@ -403,6 +429,10 @@ async def _send_tool_response(
except Exception as e:
logger.error(f"Failed to send tool response: {e}")

def _set_instructions(self, instructions: str):
super()._set_instructions(instructions)
self.realtime_session["instructions"] = self._build_enhanced_instructions() or ""

def _sanitize_tool_output(self, value: Any, max_chars: int = 60_000) -> str:
"""Sanitize tool output for OpenAI realtime.

Expand Down
Loading