-
Notifications
You must be signed in to change notification settings - Fork 12
Open
Description
Details
Openai 的实时API 如何拆成STT, TTS, LLM 的服务?
class RealtimeApiConnection:
"""实时API连接类,用于管理与OpenAI实时API的WebSocket连接"""
def __init__(
self,
base_uri: str,
api_key: str | None = None,
path: str = "/v1/realtime",
verbose: bool = False,
model: str = DEFAULT_VIRTUAL_MODEL,
):
"""
初始化实时API连接
参数:
base_uri: API基础URL
api_key: OpenAI API密钥
path: API路径
verbose: 是否启用详细日志
model: 使用的模型名称
"""
self.url = f"{base_uri}{path}"
if "model=" not in self.url:
self.url += f"?model={model}"
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.websocket: aiohttp.ClientWebSocketResponse | None = None
self.verbose = verbose
self.session = aiohttp.ClientSession()
async def __aenter__(self) -> "RealtimeApiConnection":
"""异步上下文管理器入口"""
await self.connect()
return self
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool:
"""异步上下文管理器出口"""
await self.close()
return False
async def connect(self):
"""建立WebSocket连接"""
auth = aiohttp.BasicAuth("", self.api_key) if self.api_key else None
headers = {"OpenAI-Beta": "realtime=v1"}
self.websocket = await self.session.ws_connect(
url=self.url,
auth=auth,
headers=headers,
)
async def send_audio_data(self, audio_data: bytes):
"""
发送音频数据
参数:
audio_data: PCM16 24kHz单声道小端字节序的音频数据
"""
base64_audio_data = base64.b64encode(audio_data).decode("utf-8")
message = InputAudioBufferAppend(audio=base64_audio_data)
await self.send_request(message)
async def send_request(self, message: ClientToServerMessage):
"""
发送请求消息到服务器
参数:
message: 客户端到服务器的消息对象
"""
assert self.websocket is not None
message_str = to_json(message)
if self.verbose:
logger.info(f"-> {smart_str(message_str)}")
await self.websocket.send_str(message_str)
async def listen(self) -> AsyncGenerator[ServerToClientMessage, None]:
"""
监听来自服务器的消息
生成器函数,用于异步迭代接收到的服务器消息
"""
assert self.websocket is not None
if self.verbose:
logger.info("正在监听实时API消息")
try:
async for msg in self.websocket:
if msg.type == aiohttp.WSMsgType.TEXT:
if self.verbose:
logger.info(f"<- {smart_str(msg.data)}")
yield self.handle_server_message(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error("接收消息时发生错误: %s", self.websocket.exception())
break
except asyncio.CancelledError:
logger.info("接收消息任务已取消")
def handle_server_message(self, message: str) -> ServerToClientMessage:
"""
处理服务器消息
参数:
message: 接收到的消息字符串
返回:
解析后的服务器消息对象
"""
try:
return parse_server_message(message)
except Exception as e:
logger.error("处理消息时发生错误: " + str(e))
raise e
async def close(self):
"""关闭WebSocket连接"""
# 如果WebSocket连接存在则关闭
if self.websocket:
await self.websocket.close()
self.websocket = None
Take the following project management steps to fulfill this milestone:
- Run the full scope of the proposed EN doc updates by the PO
- Merge all PRs and close relevant tickets when the work is done
- Create and share a Vercel instance for review and stakeholder signoff
- Implement comments from stakeholders until fully resolved
- Publish updates to https://docs.agora.io/en/
- Inform the stakeholders
- Close the milestone
Metadata
Metadata
Assignees
Labels
No labels