Skip to content
7 changes: 5 additions & 2 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,9 @@ async def _image_to_video_processors(self, track_id: str, track_type: int):
async def _on_track_removed(
self, track_id: str, track_type: int, participant: Participant
):
self._active_video_tracks.pop(track_id)
await self._on_track_change(track_id)
track = self._active_video_tracks.pop(track_id, None)
if track is not None:
await self._on_track_change(track_id)

async def _on_track_change(self, track_id: str):
# shared logic between track remove and added
Expand All @@ -972,6 +973,8 @@ async def _on_track_change(self, track_id: str):
non_processed_tracks = [
t for t in self._active_video_tracks.values() if not t.processor
]
if not non_processed_tracks:
return
source_track = sorted(
non_processed_tracks, key=lambda t: t.priority, reverse=True
)[0]
Expand Down
1 change: 0 additions & 1 deletion agents-core/vision_agents/core/utils/audio_forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ async def start(self) -> None:
logger.warning("AudioForwarder already started")
return
self._task = asyncio.create_task(self._reader())
logger.info("AudioForwarder started")

async def stop(self) -> None:
"""Stop forwarding audio frames."""
Expand Down
2 changes: 1 addition & 1 deletion plugins/openai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ requires-python = ">=3.10"
license = "MIT"
dependencies = [
"vision-agents",
"openai[realtime]>=2.5.0",
"openai[realtime]>=2.7.2",
]

[project.urls]
Expand Down
3 changes: 3 additions & 0 deletions plugins/openai/tests/test_openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ async def realtime(self):
model="gpt-realtime",
voice="alloy",
)
realtime._set_instructions(
"be friendly"
)
try:
yield realtime
finally:
Expand Down
119 changes: 78 additions & 41 deletions plugins/openai/vision_agents/plugins/openai/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from typing import Any, Optional, List, Dict

import aiortc
from openai import AsyncOpenAI
from openai.types.beta.realtime import RateLimitsUpdatedEvent
from openai.types.realtime import RealtimeSessionCreateRequestParam, RealtimeAudioConfigParam, \
RealtimeAudioConfigOutputParam, RealtimeAudioConfigInputParam, AudioTranscriptionParam
from openai.types.realtime.realtime_transcription_session_audio_input_turn_detection_param import SemanticVad

from getstream.video.rtc import AudioStreamTrack
from openai.types.realtime import (
RealtimeSessionCreateRequestParam,
from openai.types.beta.realtime import (
ResponseAudioTranscriptDoneEvent,
InputAudioBufferSpeechStartedEvent,
ConversationItemInputAudioTranscriptionCompletedEvent,
ResponseDoneEvent,
ResponseDoneEvent, SessionCreatedEvent,
)

from vision_agents.core.llm import realtime
Expand All @@ -26,21 +31,12 @@

logger = logging.getLogger(__name__)


"""
TODO
- improve event parsing, reuse built-in types where possible
- not clear why we resize video to width=640, height=480
- reconnect flow isn't implemented properly
- is it needed to redo the SDP offer/answer cycle when receiving a new track?
- support passing full client options in __init__
- review either SessionCreateParams or RealtimeSessionCreateRequestParam
- send video should depend on if the RTC connection with stream is sending video. not always send
TODO: Future improvements
- send video should depend on if the RTC connection with stream is sending video. not always send (requires SDP renegotiation)
- more testing with adding/removing video tracks
"""

client = RealtimeSessionCreateRequestParam


class Realtime(realtime.Realtime):
"""
OpenAI Realtime API implementation for real-time AI audio and video communication over WebRTC.
Expand All @@ -52,7 +48,10 @@ class Realtime(realtime.Realtime):
Args:
model: OpenAI model to use (e.g., "gpt-realtime").
voice: Voice for audio responses (e.g., "marin", "alloy").
send_video: Enable video streaming capabilities. Defaults to False.
realtime_session: Configure RealtimeSessionCreateRequestParam

api_key: Optionally specify an API key
client: pass your own AsyncOpenAI client

This class uses:
- RTCManager to handle WebRTC connection and media streaming.
Expand All @@ -63,20 +62,51 @@ class Realtime(realtime.Realtime):
"""

def __init__(
self, model: str = "gpt-realtime", voice: str = "marin", *args, **kwargs
self, model: str = "gpt-realtime", api_key: Optional[str] = None, voice: str = "marin", client: Optional[AsyncOpenAI] = None, fps: int = 1,
realtime_session: Optional[RealtimeSessionCreateRequestParam] = None
):
super().__init__(*args, **kwargs)
super().__init__(fps)
self.model = model
self.voice = voice
# TODO: send video should depend on if the RTC connection with stream is sending video.
self.rtc = RTCManager(self.model, self.voice, True)

self.realtime_session: RealtimeSessionCreateRequestParam = realtime_session or RealtimeSessionCreateRequestParam(
type="realtime"
)
self.realtime_session["model"] = self.model

# Set audio and output if they are None
# TODO: handle more edge cases of updating the passed settings/ good defaults
if self.realtime_session.get("audio") is None:
self.realtime_session["audio"] = RealtimeAudioConfigParam(
input=RealtimeAudioConfigInputParam(
transcription=AudioTranscriptionParam(model="gpt-4o-mini-transcribe"),
turn_detection=SemanticVad(type="semantic_vad")

))
if self.realtime_session["audio"].get("output") is None:
self.realtime_session["audio"]["output"] = RealtimeAudioConfigOutputParam()
self.realtime_session["audio"]["output"]["voice"] = self.voice

self.realtime_session["instructions"] = self.instructions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid writing None into session instructions

RealtimeSessionCreateRequestParam["instructions"] is declared as str, but both here and in _set_instructions we can assign None. That violates the API contract (see the CI failure at Line 90) and will surface as a runtime bug once the typed dict is validated. Please guard the assignment and drop the key when we have no content.

-        self.realtime_session["instructions"] = self.instructions
+        if self.instructions is not None:
+            self.realtime_session["instructions"] = self.instructions
+        else:
+            self.realtime_session.pop("instructions", None)-        self.realtime_session["instructions"] = self._build_enhanced_instructions()
+        enhanced_instructions = self._build_enhanced_instructions()
+        if enhanced_instructions is not None:
+            self.realtime_session["instructions"] = enhanced_instructions
+        else:
+            self.realtime_session.pop("instructions", None)

Also applies to: 439-441

🧰 Tools
🪛 GitHub Actions: CI (unit)

[error] 90-90: Value of "instructions" has incompatible type "str | None"; expected "str" [typeddict-item]

🤖 Prompt for AI Agents
In plugins/openai/vision_agents/plugins/openai/openai_realtime.py around line 90
(and similarly around lines 439-441), the code assigns self.instructions (which
may be None) directly into realtime_session["instructions"], violating the
declared str type; change the logic to only set the "instructions" key when
self.instructions is a non-empty string (or truthy), and otherwise ensure the
key is removed or not present in the dict (i.e., guard the assignment and drop
the key when there is no content).


self._output_audio_track = AudioStreamTrack(
sample_rate=48000, channels=2, format="s16"
)
# Map conversation item_id to participant to handle multi-user scenarios
self._item_to_participant: Dict[str, Participant] = {}
self._pending_participant: Optional[Participant] = None

# create the client
if client is not None:
self.client = client
elif api_key is not None and api_key != "":
self.client = AsyncOpenAI(api_key=api_key)
else:
self.client = AsyncOpenAI() # will get it from the env vars

# Start the realtime connection manager
self.rtc = RTCManager(realtime_session=self.realtime_session, client=self.client)

@property
def output_audio_track(self) -> AudioStreamTrack:
return self._output_audio_track
Expand Down Expand Up @@ -147,13 +177,6 @@ async def simple_audio_response(
self._pending_participant = participant
await self.rtc.send_audio_pcm(audio)

async def request_session_info(self) -> None:
"""Request session information from the OpenAI API.

Delegates to the RTC manager to query session metadata.
"""
await self.rtc.request_session_info()

async def close(self):
await self.rtc.close()

Expand All @@ -175,7 +198,6 @@ async def _handle_openai_event(self, event: dict) -> None:
Registered as callback with RTC manager.
"""
et = event.get("type")
logger.debug(f"OpenAI Realtime event: {et}")

# code here is weird because OpenAI does something strange
# see issue: https://github.com/openai/openai-python/issues/2698
Expand All @@ -185,10 +207,8 @@ async def _handle_openai_event(self, event: dict) -> None:
"response.audio_transcript.done",
"response.output_audio_transcript.done",
]:
event_copy = event.copy()
event_copy["type"] = "response.output_audio_transcript.done"
transcript_event: ResponseAudioTranscriptDoneEvent = (
ResponseAudioTranscriptDoneEvent.model_validate(event_copy)
ResponseAudioTranscriptDoneEvent.model_validate(event)
)
self._emit_agent_speech_transcription(
text=transcript_event.transcript, original=event
Expand Down Expand Up @@ -218,14 +238,6 @@ async def _handle_openai_event(self, event: dict) -> None:

# Look up the correct participant for this transcription
participant = self._item_to_participant.get(item_id)
if participant:
logger.info(
f"User speech transcript from {participant.user_id}: {user_transcript_event.transcript}"
)
else:
logger.info(
f"User speech transcript (no participant mapping): {user_transcript_event.transcript}"
)

# Temporarily set the correct participant for this specific transcription
original_participant = self._current_participant
Expand All @@ -252,15 +264,36 @@ async def _handle_openai_event(self, event: dict) -> None:
await self._handle_tool_call_event(event)
elif et == "response.created":
pass
elif et == "session.created":
e = SessionCreatedEvent(**event)
elif et == "rate_limits.updated":
e = RateLimitsUpdatedEvent(**event)
elif et == "response.done":
logger.info("OpenAI response done %s", event)
e = ResponseDoneEvent(**event)
e = ResponseDoneEvent.model_validate(event)

if e.response.status == "failed":
raise Exception("OpenAI realtime failure %s", e.response)
elif et == "session.updated":
pass
# e = SessionUpdatedEvent(**event)
elif et == "response.content_part.added":
# Content part added to response - logged for debugging
pass
elif et == "response.audio_transcript.delta":
# Streaming transcript delta - logged at debug level to avoid clutter
pass
elif et == "output_audio_buffer.started":
# Output audio buffer started - acknowledgment of audio playback start
pass
elif et == "response.audio.done":
# Audio generation complete for this response item
pass
elif et == "response.content_part.done":
# Content part complete - contains full transcript
pass
elif et == "response.output_item.done":
# Output item complete - logged for debugging
pass
else:
logger.info(f"Unrecognized OpenAI Realtime event: {et} {event}")

Expand Down Expand Up @@ -403,6 +436,10 @@ async def _send_tool_response(
except Exception as e:
logger.error(f"Failed to send tool response: {e}")

def _set_instructions(self, instructions: str):
super()._set_instructions(instructions)
self.realtime_session["instructions"] = self._build_enhanced_instructions()

def _sanitize_tool_output(self, value: Any, max_chars: int = 60_000) -> str:
"""Sanitize tool output for OpenAI realtime.

Expand Down
Loading
Loading