diff --git a/content/docs/ten_agent/extension_dev/create_asr_extension.cn.mdx b/content/docs/ten_agent/extension_dev/create_asr_extension.cn.mdx new file mode 100644 index 0000000..3cd42b0 --- /dev/null +++ b/content/docs/ten_agent/extension_dev/create_asr_extension.cn.mdx @@ -0,0 +1,1501 @@ +--- +title: 创建 ASR 扩展 +description: 从零开始创建、开发、测试并发布一个完整的 ASR 扩展 +--- + +# 创建 ASR Extension完整指南 + +本教程将指导你从零开始创建一个生产级别的 ASR(自动语音识别) Extension,涵盖从项目创建、核心开发、测试验证到发布上线的完整流程。 + +## 什么是 ASR Extension + +ASR Extension 是 TEN Framework 生态系统中的一个**标准扩展积木**(Standard Extension),专门用于实现自动语音识别(Automatic Speech Recognition)功能。 + +### 核心功能 + +ASR Extension 的主要职责包括: + +1. **接收音频流**: 从上游扩展持续接收实时音频数据流(通常为 PCM 格式) +2. **实时转写**: 将音频数据实时转换成对应的文字结果 +3. **发送结果**: 将识别的文字结果传递给下游扩展进行后续处理 + +### 在对话流中的位置 + +作为标准积木,ASR Extension 在 TEN Agent 对话流中扮演着**音频到文本转换**的关键角色: + +``` +[上游积木] ──音频流──> [ASR Extension] ──文字流──> [下游积木] +``` + +**典型的上游积木**: +- **RTC Extension**: 从 RTC 频道拉取远端音频流 +- **Audio Capture Extension**: 从麦克风或音频文件获取音频数据 +- **Audio Processing Extension**: 提供经过预处理的音频流(如降噪、回声消除等) + +**典型的下游积木**: +- **LLM Extension**: 将识别的文字作为输入,进行对话理解和生成 +- **Translation Extension**: 对识别的文字进行跨语言翻译 +- **Intent Recognition Extension**: 提取用户意图和关键信息 + +### 实际应用场景 + +**场景1: AI 语音对话助手** +``` +RTC Extension → ASR Extension → LLM Extension → TTS Extension → RTC Extension +``` +从 RTC 频道采集用户的语音,ASR 将语音转写成文字,LLM 理解语义并生成回复,TTS 将回复转换成语音后推送到 RTC 频道。 + +**场景2: 实时语音翻译** +``` +RTC Extension → ASR Extension → Translation Extension → TTS Extension → RTC Extension +``` +采集用户的中文语音,ASR 识别成中文文字,Translation 积木转换成目标语言(如英文),TTS 将译文转换成语音输出。 + +**场景3: 语音智能控制** +``` +Microphone Extension → ASR Extension → Intent Recognition Extension → Action Executor Extension +``` +通过麦克风采集语音指令,ASR 转写成文字,Intent Recognition 提取控制意图,Action Executor 执行相应的设备控制动作。 + +### 标准化 ASR Extension 的意义 + +将 ASR 功能封装成标准扩展积木,带来以下核心价值: + +- **🔌 即插即用**: 轻松切换不同的 ASR 服务商(Deepgram、Azure、Google 等),无需修改上下游积木 +- **🔄 灵活组合**: 与其他标准积木自由组合,快速构建各类 AI 应用场景 +- **🛠️ 易于维护**: 独立开发、测试、升级,不影响其他积木的稳定性 +- **📦 高度复用**: 一次开发,多个项目复用,显著提升开发效率 +- **🌐 生态共享**: 发布到 TEN Store,让全球开发者受益 + +## 📋 你将学到什么 + +- 🚀 使用 ASR 模板快速创建扩展项目 +- ⚙️ 理解 ASR Extension 接口规范 +- 🔧 实现 ASR Extension 核心功能 +- 🧪 编写全面的单元测试和集成测试 +- 📊 掌握日志记录、错误处理等最佳实践 +- 🌐 发布扩展到 TEN Store 供社区使用 + +## 📚 前置条件 + +开始本教程前,请确保你已具备: + +- **基础知识**: 熟悉 [TEN Agent 架构](/docs/ten_agent/getting_started) 和 ASR 服务基本概念 +- **技术能力**: 掌握 Python 异步编程(`asyncio`、`async/await`) +- **开发环境**: 在开发容器内开发(安装好tman) +- **API 资源**: 准备好 ASR 服务商的 API 密钥(用于测试验证) + + + **示例说明**: 本教程以 Deepgram ASR 为例进行讲解,但所介绍的方法和模式同样适用于其他 ASR 服务商或者本地ASR模型。 + + +## 1. 🚀 项目初始化 + +### 创建扩展项目 + +使用 TMan 的 ASR 专用模板快速创建项目骨架: + +```bash title="Terminal" +# 进入扩展目录 +cd ten-framework/ai_agents/agents/ten_packages/extension + +# 创建ASR扩展项目 +tman create extension my_asr_extension --template default_asr_python --template-data class_name_prefix=MyAsr +``` + +创建成功后会显示: + +```bash title="输出信息" +Package 'extension:my_asr_extension' created successfully in 'my_asr_extension' in 2 seconds. +``` + +### 安装项目依赖 + +#### 添加第三方库依赖 + +首先在 `requirements.txt` 中添加 Deepgram SDK: + +```text title="requirements.txt" +websockets~=14.0 +pydantic +requests +deepgram-sdk +aiofiles +``` + +#### 安装 TEN 依赖 + +进入创建的扩展目录并安装依赖: + +```bash title="Terminal" +cd my_asr_extension +tman install --standalone +``` + +这会根据 `manifest.json` 中声明的依赖构建依赖树,并安装到 `.ten` 目录下。 + +## 2. 🏗️ 扩展架构设计 + +### 项目结构概览 + +``` +my_asr_extension/ +├── .vscode/ # VS Code 调试配置 +│ └── launch.json # 调试启动配置 +├── manifest.json # 扩展元数据和依赖声明 +├── property.json # 默认配置参数 +├── requirements.txt # Python 依赖 +├── extension.py # 主要实现文件 +└── tests/ # 测试文件 + ├── bin/start # 测试启动脚本 + ├── test_basic.py # 单元测试 + └── configs/ # 测试配置 +``` + +### ASR Extension 接口规范 + +ASR Extension 遵循 TEN Framework 的标准接口规范。使用模板创建的 ASR Extension 会自动配置好接口继承关系和必要的 API 声明。 + +#### Manifest 配置 + +ASR Extension 的 `manifest.json` 文件中需要正确配置接口和属性声明: + +**1. Interface 继承** + +在 `manifest.json` 的 `api.interface` 中声明继承自 `ten_ai_base` 系统包下的标准 ASR 接口: + +```json title="manifest.json" +{ + "api": { + "interface": [ + { + "import_uri": "../../system/ten_ai_base/api/asr-interface.json" + } + ] + } +} +``` + +该接口文件(`asr-interface.json`)中定义了所有 ASR Extension 必须遵循的标准属性,包括: +- `dump`: 布尔值,配置是否开启音频 dump +- `dump_path`: 字符串,音频 dump 的存储路径 + +**2. Property 声明** + +除了继承标准接口外,每个 ASR Extension 还需要在 `api.property` 中声明自己特有的配置属性,特别是 `params` 对象中的必填字段。 + +例如如下配置: + +```json title="manifest.json" +{ + "api": { + "interface": [ + { + "import_uri": "../../system/ten_ai_base/api/asr-interface.json" + } + ], + "property": { + "properties": { + "params": { + "type": "object", + "properties": { + "key": { + "type": "string" + }, + "region": { + "type": "string" + }, + "language": { + "type": "string" + } + } + } + } + } + } +} +``` + +**关键说明**: +- **标准属性**(如 `dump`、`dump_path`):通过 `interface` 继承自 `asr-interface.json`,所有 ASR Extension 共有 +- **扩展属性**(如 `params.key`、`params.language`):在 `api.property` 中声明,根据不同的 ASR 服务商而异 + +使用模板创建 ASR Extension 时,这些配置会自动生成,但需要根据实际的服务商需求调整 `params` 中的属性。 + +#### 输入输出数据格式 + +ASR 标准接口(`asr-interface.json`)中除了属性配置外,还定义了输入和输出的数据格式规范: + +**输入数据**: +- **PCM 音频帧** (`pcm_frame`): 从上游接收的音频数据流 +- **Finalize 事件** (`asr_finalize`): VAD 检测到人声结束时触发 + +**输出数据**: +- **识别结果** (`asr_result`): ASR 转写的文字结果 +- **Finalize 完成通知** (`asr_finalize_end`): Finalize 处理完成的通知 +- **错误信息** (`error`): 发生错误时的错误详情 +- **性能指标** (`metrics`): TTFW、TTLW 等性能数据 + +详细的数据结构定义和字段说明请参考 `asr-interface.json` 文件。 + +### 核心继承关系 + +```python +AsyncASRBaseExtension # TEN AI Base 提供的抽象基类 + ↓ +MyAsrExtension # 你的具体实现 +``` + +#### 基类功能概述 + +`AsyncASRBaseExtension` 是 TEN AI Base 提供的 ASR 扩展抽象基类,它为所有 ASR Extension 提供了统一的框架和开箱即用的功能: + +**核心职责**: + +1. **生命周期管理**:自动处理扩展的初始化、启动、停止等生命周期事件 +2. **音频帧处理**: + - 接收上游的音频帧并放入异步队列 + - 根据连接状态自动执行缓冲策略(丢弃或保持) + - 提取和管理 session_id、metadata 等元信息 +3. **Finalize 事件处理**:接收 `asr_finalize` 数据,调用子类的 `finalize()` 方法 +4. **性能指标自动计算**: + - TTFW(Time To First Word):首词延迟 + - TTLW(Time To Last Word):末词延迟 + - 音频发送时长统计和定期上报 +5. **标准化输出**:提供统一的 API 发送识别结果、错误信息、性能指标等数据 +6. **会话管理**:自动为每轮对话生成唯一 ID,管理 metadata 传递 + +通过继承基类,开发者只需专注于实现与具体 ASR 服务商交互的核心逻辑,无需关心框架层的通用处理。 + +#### 必须实现的抽象方法 + +- `vendor()`: 返回 ASR 服务商名称 +- `start_connection()`: 建立与 ASR 服务的连接 +- `stop_connection()`: 停止连接 +- `send_audio(frame: AudioFrame, session_id: str | None) -> bool`: 发送音频数据,返回是否成功 +- `finalize(session_id: str | None)`: 快速触发最终结果(VAD检测到人声结束后,通过断连、发送静音包或供应商专用API快速获得final结果,降低对话延迟) +- `is_connected() -> bool`: 检查连接状态 +- `input_audio_sample_rate() -> int`: 返回音频采样率(Hz) + +#### 可选实现的方法 + +- `input_audio_channels() -> int`: 音频声道数(默认1声道) +- `input_audio_sample_width() -> int`: 采样位宽(默认2字节/16位) +- `buffer_strategy() -> ASRBufferConfig`: 音频缓冲策略(默认丢弃模式) +- `audio_actual_send_metrics_interval() -> int`: 音频时长指标上报间隔(默认5秒) + +#### 基类提供的工具方法 + +- `send_asr_result(asr_result: ASRResult)`: 发送识别结果 +- `send_asr_error(error: ModuleError, vendor_info: ModuleErrorVendorInfo | None)`: 发送错误信息 +- `send_asr_finalize_end()`: 发送 finalize 完成通知 +- `send_connect_delay_metrics(connect_delay: int)`: 发送连接延迟指标 +- `send_vendor_metrics(vendor_metrics: dict)`: 发送供应商自定义指标 + +## 3. ⚙️ 配置管理设计 + +### 设计配置类 + +创建灵活的配置类,支持必填参数和可选透传参数: + +```python title="extension.py" +from pydantic import BaseModel +from typing import Dict, Optional + +class MyAsrConfig(BaseModel): + # 所有ASR参数都在params中,包括必填和可选参数 + params: Dict[str, Optional[str]] = {} + + # 音频dump相关配置 - 所有ASR扩展的标准实现 + dump: bool = False + dump_path: Optional[str] = None +``` + +### 读取扩展配置 + +在 `on_init` 阶段读取和初始化配置: + +```python title="extension.py" +from ten_ai_base.const import LOG_CATEGORY_KEY_POINT, LOG_CATEGORY_VENDOR +from ten_ai_base.message import ModuleError, ModuleErrorCode + +@override +async def on_init(self, ten_env: AsyncTenEnv) -> None: + await super().on_init(ten_env) + + # 读取完整的扩展配置 + config_json, _ = await ten_env.get_property_to_json("") + + try: + # 反序列化配置为配置类实例 + self.config = MyAsrConfig.model_validate_json(config_json) + + # 打印配置信息(敏感信息脱敏) + ten_env.log_info( + f"config: {self.config.to_json(sensitive_handling=True)}", + category=LOG_CATEGORY_KEY_POINT, + ) + + # 初始化音频 dumper(如果开启) + if self.config.dump: + dump_file_path = os.path.join( + self.config.dump_path, DUMP_FILE_NAME + ) + self.audio_dumper = Dumper(dump_file_path) + + except Exception as e: + ten_env.log_error( + f"invalid property: {e}", + category=LOG_CATEGORY_KEY_POINT + ) + # 配置错误时使用默认配置 + self.config = MyAsrConfig.model_validate_json("{}") + # 发送致命错误 + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.FATAL_ERROR.value, + message=str(e), + ), + ) +``` + +### 配置敏感信息脱敏 + +为配置类添加脱敏方法,保护敏感信息: + +```python title="extension.py" +from ten_ai_base.utils import encrypt + +class MyAsrConfig(BaseModel): + params: Dict[str, Optional[str]] = {} + dump: bool = False + dump_path: Optional[str] = None + + def to_json(self, sensitive_handling: bool = False) -> str: + """ + 序列化配置为 JSON,支持敏感信息脱敏 + + Args: + sensitive_handling: 是否对敏感信息进行脱敏处理 + """ + if not sensitive_handling: + return self.model_dump_json() + + # 深拷贝配置对象 + config = self.model_copy(deep=True) + + # 对 params 中的敏感字段进行脱敏 + if config.params: + encrypted_params = {} + for key, value in config.params.items(): + # 对包含 'key'、'token'、'secret' 等敏感词的字段进行加密 + if (key in ['api_key', 'key', 'token', 'secret', 'password'] + and isinstance(value, str) and value): + encrypted_params[key] = encrypt(value) + else: + encrypted_params[key] = value + config.params = encrypted_params + + return config.model_dump_json() +``` + +### 配置默认参数 + +在 `property.json` 中提供默认配置: + +```json title="property.json" +{ + "params": { + "url": "wss://api.deepgram.com/v1/listen", + "api_key": "your_deepgram_api_key_here", + "language": "en", + "model": "nova-2", + "sample_rate": "16000", + "punctuate": "true", + "smart_format": "true", + "interim_results": "true" + }, + "dump": false, + "dump_path": "/tmp/asr_audio_dump" +} +``` + +## 4. 🔧 核心功能实现 + +### 实现基础方法 + +```python title="extension.py" +import asyncio +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions +) +from ten_ai_base.asr import ASRResult + +class MyAsrExtension(AsyncASRBaseExtension): + def __init__(self, name: str): + super().__init__(name) + self.config: MyAsrConfig = MyAsrConfig() + self.deepgram_client: Optional[AsyncListenWebSocketClient] = None + self.is_connected_flag: bool = False + self.last_finalize_timestamp: float = 0.0 # 用于延迟计算 + + @override + def vendor(self) -> str: + """返回ASR服务商名称""" + return "deepgram" + + @override + def input_audio_sample_rate(self) -> int: + """返回音频采样率""" + return int(self.config.params.get("sample_rate", 16000) or 16000) + + @override + def is_connected(self) -> bool: + """检查连接状态""" + return self.is_connected_flag +``` + +### 实现连接管理 + +#### 建立连接 + +start_connection 会**在extension初始化完成后自动执行**,用于和供应商建立连接,监听供应商返回的结果。 +在start_connection中如果遇到错误,要能**打印包含错误信息的日志**,并且通过**send_asr_error**上报错误。 +如果是可以通过重试解决的错误,要通过**重试机制**解决。 + +```python title="extension.py" +@override +async def start_connection(self) -> None: + """建立与Deepgram的WebSocket连接""" + try: + # 确保清理之前的连接 + await self.stop_connection() + + # 创建Deepgram客户端配置 + config = DeepgramClientOptions( + api_key=self.config.params.get("api_key", "") or "" + ) + + # 初始化WebSocket客户端 + deepgram = DeepgramClient(config=config) + self.deepgram_client = deepgram.listen.live.v("1") + + # 注册事件处理器 + await self._register_deepgram_events() + + # 创建连接选项 + options = LiveOptions( + model=self.config.params.get("model", "nova-2") or "nova-2", + language=self.config.params.get("language", "en") or "en", + sample_rate=self.config.params.get("sample_rate", 16000) or 16000, + ) + + # 透传其他参数 + for key, value in self.config.params.items(): + if key not in ["url", "api_key", "language", "model", "sample_rate"] and value: + setattr(options, key, value == "true" if value in ["true", "false"] else value) + + # 启动连接 + await self.deepgram_client.start(options) + + except Exception as e: + self.ten_env.log_error(f"failed to connect to deepgram: {e}", category=LOG_CATEGORY_VENDOR) + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.FATAL_ERROR.value, + message=str(e), + ), + ) +``` +#### 停止连接 + +stop_connection 会**在extension销毁前自动执行**,用于和供应商断开连接。 + +```python title="extension.py" +@override +async def stop_connection(self) -> None: + """停止Deepgram连接""" + if self.deepgram_client: + await self.deepgram_client.finish() + self.deepgram_client = None + self.is_connected_flag = False +``` + +### 实现音频处理 + +asr extension 收到上游extension发来的音频数据后,期望能**流式**发送给asr供应商/asr模型,并且**流式**获取asr结果。 +asr base class 在收到audio frame后会根据当前的 is_connected 状态来决定是否要调用send_audio方法发给供应商或是丢掉/缓存。 + +```python title="extension.py" +@override +async def send_audio(self, audio_frame: AudioFrame) -> bool: + """发送音频数据到ASR服务""" + if not self.is_connected() or not self.deepgram_client: + return False + + try: + # 获取音频数据 + audio_buf = audio_frame.get_buf() + if not audio_buf: + return False + + # 发送到Deepgram + await self.deepgram_client.send(bytes(audio_buf)) + return True + + except Exception as e: + self.ten_env.log_error(f"Failed to send audio: {e}", category="vendor") + return False +``` + +#### 配置音频缓冲策略 + +asr base class 在收到 audio frame 后会根据当前的 `is_connected` 状态来决定是否要调用 `send_audio` 方法发给供应商,或是丢掉/缓存音频帧。 + +基类提供两种音频缓冲策略,通过实现 `buffer_strategy()` 方法配置: + +**1. 丢弃模式** (`ASRBufferConfigModeDiscard`): +- 连接断开时直接丢弃音频帧 +- 适用于实时性要求高的场景 + +**2. 保持模式** (`ASRBufferConfigModeKeep`): +- 连接断开时缓存音频帧,连接恢复后发送 +- 通过 `byte_limit` 控制缓存大小 +- 适用于需要完整音频处理的场景 + +```python title="extension.py" +from ten_ai_base.asr import ASRBufferConfig, ASRBufferConfigModeKeep + +@override +def buffer_strategy(self) -> ASRBufferConfig: + """配置音频缓冲策略""" + return ASRBufferConfig( + mode=ASRBufferConfigModeKeep(byte_limit=10 * 1024 * 1024) # 10MB 缓存上限 + ) +``` + +#### 为什么推荐使用保持模式? + +**强烈推荐使用保持模式**,以保证时间戳的准确性。原因如下: + +**时间戳偏移问题**: + +如果使用丢弃模式,当连接断开时直接丢弃音频帧,这些被丢弃的音频帧不会被发送给 ASR 供应商。但 ASR 供应商返回的识别结果中的时间戳是**相对于它收到的音频流计算的**,而不是相对于真实的音频时间轴。这会导致时间戳偏移问题。 + +**举例说明**: + +假设音频流按时间顺序如下: +1. **0-10秒**:正常接收并发送给 ASR +2. **10-15秒**:连接断开,使用丢弃模式,这5秒音频被丢弃 +3. **15-20秒**:连接恢复,接收并发送给 ASR + +使用**丢弃模式**的后果: +- ASR 供应商实际只收到了 15 秒的音频(0-10秒 + 15-20秒) +- 当 ASR 对 15-20秒 的音频出结果时,它会认为这段内容的时间戳是 **10-15秒**(因为它认为这是它收到的第10-15秒的音频) +- 但实际上这段内容对应的真实时间戳是 **15-20秒** +- **时间戳偏差:5秒** + +**影响**: + +后续模块如果依赖这个不准确的时间戳,可能会导致: +- **对话同步问题**:TTS 可能在错误的时间点播放回复 +- **打断检测失败**:用户的打断时机判断不准确 +- **会话记录错乱**:对话历史中的时间信息不正确 + +**保持模式的优势**: + +使用保持模式时: +- 连接断开期间的音频帧会被缓存(而不是丢弃) +- 连接恢复后,缓存的音频帧会全部发送给 ASR 供应商 +- ASR 供应商收到完整、连续的音频流,时间戳计算准确 +- 即使中间断开一段时间,也不会影响时间戳的准确性 + +#### 实现 finalize 方法 + +finalize 会**在VAD检测到人声结束后自动执行**,用于触发ASR服务返回final结果。 + +tips: +- 如果明确finalize完成的时间点,需要调用send_asr_finalize_end来通知finalize完成。 +- 如果是通过断连方式来实现finalize,要处理好重连逻辑。 +- 如果是通过送静音包方式来实现finalize,要注意时间戳的计算(返回的asr结果里的时间戳可能包含静音包的时长,要能够正确去除)。 + +```python title="extension.py" +@override +async def finalize(self) -> None: + """快速触发最终结果 + + 收到VAD检测到人声结束后,立即触发ASR服务返回final结果。 + 这对于对话场景非常重要,可以显著降低用户感知的延迟。 + + 实现方式: + - Deepgram: 调用finalize() API快速结束转录 + - 其他服务商: 可通过断连、发送静音包等方式实现 + """ + if self.deepgram_client: + # 记录finalize时间戳,用于延迟计算 + self.last_finalize_timestamp = asyncio.get_event_loop().time() * 1000 + await self.deepgram_client.finalize() + await self.send_asr_finalize_end() +``` + +### 实现供应商事件处理 + +ASR 扩展需要处理供应商的各种事件,包括连接状态变化、识别结果和错误情况。这是实现稳定 ASR 服务的关键部分。 + +#### 事件注册 + +首先注册所有必要的事件处理器: + +```python title="extension.py" +async def _register_deepgram_events(self) -> None: + """注册Deepgram WebSocket事件处理器""" + if not self.deepgram_client: + return + + self.deepgram_client.on(LiveTranscriptionEvents.Open, self._on_open) + self.deepgram_client.on(LiveTranscriptionEvents.Close, self._on_close) + self.deepgram_client.on(LiveTranscriptionEvents.Transcript, self._on_transcript) + self.deepgram_client.on(LiveTranscriptionEvents.Error, self._on_error) +``` + +#### 连接状态管理 + +**关键要点**:连接状态变化必须打印关键日志,帮助排查连接问题。 + +```python title="extension.py" +async def _on_open(self, *args, **kwargs) -> None: + """连接建立成功回调""" + self.is_connected_flag = True + + # 打印关键连接日志 + self.ten_env.log_info( + "vendor_status_changed: connection opened", + category=LOG_CATEGORY_VENDOR + ) + + # 重置重连计数器 + if self.reconnect_manager: + self.reconnect_manager.mark_connection_successful() + +async def _on_close(self, *args, **kwargs) -> None: + """连接关闭回调""" + self.is_connected_flag = False + + # 打印关键断连日志 + self.ten_env.log_warn( + "vendor_status_changed: connection closed", + category=LOG_CATEGORY_VENDOR + ) + + # 检查是否为意外断连 + if self.deepgram_client: # 客户端存在说明非主动关闭 + self.ten_env.log_warn( + "Unexpected disconnection detected, attempting reconnection", + category=LOG_CATEGORY_VENDOR + ) + # 触发自动重连 + await self._handle_reconnect() +``` + +#### 识别结果处理 + +**关键要点**:收到供应商结果后必须转换成标准 ASRResult 结构并通过 `send_asr_result` 发送。 + +```python title="extension.py" +async def _on_transcript(self, *args, **kwargs) -> None: + """处理转录结果回调""" + result = args[1] if len(args) > 1 else None + if not result: + return + + # 打印供应商原始结果(调试用) + self.ten_env.log_debug( + f"vendor_result: received transcript: {result}", + category=LOG_CATEGORY_VENDOR + ) + + try: + # 解析Deepgram结果格式 + transcript_data = result.channel.alternatives[0] if result.channel.alternatives else None + if not transcript_data or not transcript_data.transcript: + return + + transcript_text = transcript_data.transcript.strip() + if not transcript_text: + return + + # 转换为标准ASR结果结构 + asr_result = ASRResult( + text=transcript_text, + final=result.is_final, + start_ms=int(result.start * 1000) if hasattr(result, 'start') else 0, + duration_ms=int(result.duration * 1000) if hasattr(result, 'duration') else 0, + language=self.config.params.get("language", "en") or "en" + ) + + # 打印处理后的结果 + self.ten_env.log_debug( + f"processed transcript: {transcript_text}, is_final: {result.is_final}", + category=LOG_CATEGORY_VENDOR + ) + + # 通过标准接口发送结果 + await self.send_asr_result(asr_result) + + except Exception as e: + # 记录结果处理错误 + self.ten_env.log_error( + f"Error processing transcript: {type(e).__name__}: {e}", + category=LOG_CATEGORY_VENDOR + ) + # 上报非致命错误 + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.NON_FATAL_ERROR.value, + message=f"Failed to process transcript: {str(e)}", + ) + ) +``` + +#### 错误处理和重连 + +**关键要点**:供应商错误必须打印日志、上报错误并触发自动重连。 + +```python title="extension.py" +async def _on_error(self, *args, **kwargs) -> None: + """供应商错误回调""" + error = args[1] if len(args) > 1 else None + if not error: + return + + # 打印关键错误日志 + self.ten_env.log_error( + f"vendor_error: deepgram error: {error}", + category=LOG_CATEGORY_VENDOR + ) + + # 上报错误信息(包含供应商详细信息) + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.NON_FATAL_ERROR.value, + message=f"Vendor error: {str(error)}", + ), + ModuleErrorVendorInfo( + vendor="deepgram", + code=getattr(error, 'code', 'unknown'), + message=str(error), + ) + ) + + # 触发自动重连(详细实现见"高级功能实现"章节) + await self._handle_reconnect() +``` + + + 重连机制的完整实现请参考后续的 [重连机制](#重连机制) 章节,其中介绍了如何使用 `ReconnectManager` 实现智能重连。 + + +## 5. 🚀 高级功能实现 + +### 重连机制 + +当 ASR 服务出现连接错误或意外断连时,需要有健壮的重连机制来保证服务的稳定性。推荐使用 `ReconnectManager` 来实现智能重连。 + + + `ReconnectManager` 的完整实现可参考 `azure_asr_python` 或 `deepgram_asr_python` 扩展中的 `reconnect_manager.py` 文件。 + + +#### 使用 ReconnectManager + +**1. 初始化 ReconnectManager** + +在扩展的构造函数中创建 ReconnectManager 实例: + +```python title="extension.py" +from .reconnect_manager import ReconnectManager + +class MyAsrExtension(AsyncASRBaseExtension): + def __init__(self, name: str): + super().__init__(name) + self.config: MyAsrConfig = MyAsrConfig() + self.deepgram_client: Optional[AsyncListenWebSocketClient] = None + self.is_connected_flag: bool = False + + # 初始化重连管理器:最多重连5次,基础延迟0.5秒 + self.reconnect_manager = ReconnectManager(max_attempts=5, base_delay=0.5) +``` + +**2. 连接成功时重置计数器** + +在连接成功的回调中重置重连计数器: + +```python title="extension.py" +async def _on_open(self, *args, **kwargs) -> None: + """连接建立成功回调""" + self.is_connected_flag = True + + self.ten_env.log_info( + "vendor_status_changed: connection opened", + category=LOG_CATEGORY_VENDOR + ) + + # 连接成功,重置重连计数器 + if self.reconnect_manager: + self.reconnect_manager.mark_connection_successful() +``` + +**3. 实现重连处理逻辑** + +当发生错误或断连时,调用重连处理: + +```python title="extension.py" +async def _handle_reconnect(self) -> None: + """处理重连逻辑""" + if not self.reconnect_manager: + self.ten_env.log_warn( + "No reconnect manager available, skipping reconnection", + category=LOG_CATEGORY_VENDOR + ) + return + + try: + # 检查是否可以重试 + if not self.reconnect_manager.can_retry(): + self.ten_env.log_error( + f"Maximum reconnection attempts ({self.reconnect_manager.max_attempts}) reached", + category=LOG_CATEGORY_VENDOR + ) + # 达到最大重连次数,发送致命错误 + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.FATAL_ERROR.value, + message="Reconnection failed permanently", + ) + ) + return + + # 使用重连管理器处理重连 + self.ten_env.log_info( + f"Attempting reconnection, attempts: {self.reconnect_manager.current_attempts + 1}/{self.reconnect_manager.max_attempts}", + category=LOG_CATEGORY_VENDOR + ) + + # 调用 ReconnectManager 的 handle_reconnect 方法 + success = await self.reconnect_manager.handle_reconnect( + connect_func=self.start_connection + ) + + if success: + self.ten_env.log_info( + "Reconnection successful", + category=LOG_CATEGORY_VENDOR + ) + else: + self.ten_env.log_error( + "Reconnection failed", + category=LOG_CATEGORY_VENDOR + ) + + except Exception as e: + self.ten_env.log_error( + f"Error in reconnection handler: {e}", + category=LOG_CATEGORY_VENDOR + ) +``` + +**4. 在错误和断连时触发重连** + +```python title="extension.py" +async def _on_close(self, *args, **kwargs) -> None: + """连接关闭回调""" + self.is_connected_flag = False + + self.ten_env.log_warn( + "vendor_status_changed: connection closed", + category=LOG_CATEGORY_VENDOR + ) + + # 检查是否为意外断连(客户端存在说明非主动关闭) + if self.deepgram_client: + self.ten_env.log_warn( + "Unexpected disconnection detected, attempting reconnection", + category=LOG_CATEGORY_VENDOR + ) + # 触发重连 + await self._handle_reconnect() + +async def _on_error(self, *args, **kwargs) -> None: + """供应商错误回调""" + error = args[1] if len(args) > 1 else None + if not error: + return + + self.ten_env.log_error( + f"vendor_error: {error}", + category=LOG_CATEGORY_VENDOR + ) + + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.NON_FATAL_ERROR.value, + message=f"Vendor error: {str(error)}", + ), + ModuleErrorVendorInfo( + vendor="deepgram", + code=getattr(error, 'code', 'unknown'), + message=str(error), + ) + ) + + # 触发重连 + await self._handle_reconnect() +``` + +#### ReconnectManager 关键特性 + +- **指数退避**:每次重连的延迟时间按指数增长(0.5s → 1s → 2s → 4s → 8s),避免过于频繁的重连 +- **次数限制**:设置最大重连次数,避免无限重连 +- **状态管理**:连接成功后自动重置计数器,为下次可能的断连做好准备 +- **错误区分**:达到最大重连次数后上报致命错误(FATAL_ERROR),其他情况上报非致命错误(NON_FATAL_ERROR) + +### 音频调试功能 + +集成音频 Dump 功能: + +```python title="extension.py" +import os +from ten_ai_base.dumper import Dumper + +# 在文件顶部定义常量 +DUMP_FILE_NAME = "my_asr_in.pcm" + +class MyAsrExtension(AsyncASRBaseExtension): + @override + async def on_init(self, ten_env: AsyncTenEnv) -> None: + """初始化阶段配置""" + await super().on_init(ten_env) + + # 初始化音频dumper + if self.config.dump: + dump_file_path = os.path.join( + self.config.dump_path, DUMP_FILE_NAME + ) + self.audio_dumper = Dumper(dump_file_path) + await self.audio_dumper.start() + + @override + async def on_deinit(self, ten_env: AsyncTenEnv) -> None: + """清理资源""" + await super().on_deinit(ten_env) + if self.audio_dumper: + await self.audio_dumper.stop() + self.audio_dumper = None + + @override + async def send_audio(self, audio_frame: AudioFrame) -> bool: + """发送音频数据(含调试功能)""" + buf = audio_frame.get_buf() + if self.audio_dumper: + await self.audio_dumper.push_bytes(bytes(buf)) + ... +``` + +## 6. 🧪 单元测试 + +### 创建测试框架 + +#### Mock的必要性 + +在单元测试中使用Mock而非真实API调用的原因: + +- **🔄 CI/CD友好**: 每次CI都会触发执行,避免供应商配额消耗 +- **💰 成本控制**: 避免不必要的API调用费用 +- **🛡️ 稳定性**: 避免因供应商连接不稳定导致的CI失败 +- **⚡ 执行速度**: Mock响应更快,提升测试执行效率 +- **🧪 可控性**: 可以模拟各种边界情况和错误场景 + +#### 最佳实践建议 + +- **开发调试阶段**: 可以使用真实的API Key进行测试,验证实际集成效果 +- **正式提交前**: 切换到Mock实现,确保CI/CD环境的稳定运行 +- **边界测试**: 使用Mock模拟网络超时、服务错误等异常情况 + +在 `tests/mock.py` 中创建 Mock 对象: + +```python title="tests/mock.py" +import pytest +from unittest.mock import MagicMock, patch +from types import SimpleNamespace + +@pytest.fixture(scope="function") +def patch_deepgram_ws(): + """Mock Deepgram WebSocket客户端""" + with patch("ten_packages.extension.my_asr_extension.extension.AsyncListenWebSocketClient") as mock_client: + # 创建mock实例 + mock_instance = MagicMock() + mock_client.return_value = mock_instance + + # 存储事件处理器 + event_handlers = {} + + def mock_on(event, handler): + event_handlers[event] = handler + + mock_instance.on = mock_on + mock_instance.start = MagicMock() + mock_instance.send = MagicMock() + mock_instance.finish = MagicMock() + mock_instance.finalize = MagicMock() + + # 提供触发事件的方法 + def trigger_open(): + if 'open' in event_handlers: + event_handlers['open']() + + def trigger_transcript(text, is_final=False): + if 'transcript' in event_handlers: + # 模拟Deepgram响应格式 + mock_result = SimpleNamespace() + mock_result.channel = SimpleNamespace() + mock_result.channel.alternatives = [SimpleNamespace()] + mock_result.channel.alternatives[0].transcript = text + mock_result.is_final = is_final + mock_result.start = 0.0 + mock_result.duration = 1.0 + + event_handlers['transcript'](None, mock_result) + + mock_instance.trigger_open = trigger_open + mock_instance.trigger_transcript = trigger_transcript + + yield mock_instance +``` + +### 测试用例设计 + +#### 测试覆盖范围 + +单元测试应该覆盖以下核心场景,确保 ASR 扩展的稳定性和正确性: + +##### 1. 配置管理测试 +- **✅ 有效配置**: 正确解析和初始化配置参数 +- **❌ 错误配置**: 使用无效配置时能上报错误并降级处理 +- **🔐 敏感信息脱敏**: 验证日志输出中敏感信息被正确加密 + +##### 2. 音频处理测试 +- **🎵 音频发送**: 输入音频帧后能正确发送给供应商 +- **📊 结果接收**: 收到供应商结果后能转换为标准格式并发送 +- **⏱️ 时间戳计算**: 验证 ASR 结果中的时间信息准确性 + +##### 3. 连接管理测试 +- **🔗 正常连接**: 验证连接建立和状态管理 +- **🔄 自动重连**: 连接错误时能自动重连 +- **📋 状态日志**: 连接状态变化能打印关键日志 + +##### 4. Finalize 流程测试 +- **📥 接收处理**: 能正确处理 `asr_finalize` 数据 +- **⚡ 快速响应**: 调用供应商 finalize API +- **📤 完成通知**: 处理完成后发送 `asr_finalize_end` 数据 + +##### 5. 错误处理测试 +- **🚨 错误上报**: 各类错误能通过 `send_asr_error` 正确上报 +- **🔍 错误分类**: 区分致命错误和非致命错误 +- **📊 供应商信息**: 供应商错误包含详细的 vendor_info + +##### 6. 音频调试功能测试 +- **💾 音频存储**: 开启 dump 后能生成正确的音频文件 +- **📁 路径管理**: 验证 dump 文件路径和命名 +- **🎛️ 开关控制**: dump 功能的启用和禁用 + +##### 7. 性能指标测试 +- **⏱️ TTFW 指标**: 验证首词延迟计算 +- **⏱️ TTLW 指标**: 验证末词延迟计算 +- **📊 自定义指标**: 供应商特定指标上报 + +### 编写测试用例 + +具体的测试用例实现可以参考 `azure_asr_python` 扩展中的测试设计 + +### 运行单元测试 + +```bash title="Terminal" +cd my_asr_extension +./tests/bin/start +``` + +### 断点调试 + +模板创建的扩展项目包含 `.vscode` 目录,提供了开箱即用的调试配置。 + +#### 使用 VS Code 调试 + +1. **打开项目**: 在 VS Code 中打开 `my_asr_extension` 目录 + +2. **查看调试配置**: `.vscode/launch.json` 中预置了调试脚本 + +```json title=".vscode/launch.json" +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Test Extension", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/tests/bin/start", + "args": [], + "console": "integratedTerminal", + "cwd": "${workspaceFolder}", + "env": { + "PYTHONPATH": "${workspaceFolder}:${workspaceFolder}/.ten/app/ten_packages/system/ten_runtime_python/lib:${workspaceFolder}/.ten/app/ten_packages/system/ten_runtime_python/interface:${workspaceFolder}/.ten/app/ten_packages/system/ten_ai_base/interface" + } + } + ] +} +``` + +3. **设置断点**: 在 `extension.py` 或测试文件中设置断点 + +4. **启动调试**: + - 按 `F5` 或使用调试面板 + - 选择 "Python: Test Extension" 配置 + - 调试器会自动运行测试用例 + +#### 调试特定测试 + +修改 `launch.json` 中的 `args` 参数来调试特定测试: + +```json title=".vscode/launch.json" +{ + "args": [ + "tests/test_basic.py::test_asr_basic_functionality", // 调试特定测试函数 + "-v" // 详细输出 + ] +} +``` + +#### 环境变量调试 + +如果需要使用真实API密钥调试,可以在 `launch.json` 中添加环境变量: + +```json title=".vscode/launch.json" +{ + "env": { + "PYTHONPATH": "...", + "DEEPGRAM_API_KEY": "your_real_api_key_here" + } +} +``` + +## 7. 🔗 集成测试(Guarder) + +### 环境变量配置 + +创建 `.env` 文件配置真实 API 密钥: + +```bash title=".env" +# Deepgram ASR API Key +DEEPGRAM_API_KEY=your_real_deepgram_api_key_here +``` + +### 测试配置 + +在 `tests/configs/` 下创建测试配置: + +```json title="tests/configs/property_en.json" +{ + "params": { + "api_key": "${env:DEEPGRAM_API_KEY}", + "language": "en-US" + } +} +``` + +### 运行 Guarder 测试 + +使用真实 API 密钥运行完整集成测试: + +```bash title="Terminal" +cd ai_agents +task asr-guarder-test EXTENSION=my_asr_extension +``` + +这将运行包括以下测试: + +- **ASR 结果测试**: 验证识别准确性和结果格式 +- **Finalize 测试**: 验证 VAD检测到人声结束后,信号处理和延迟优化效果 +- **音频处理测试**: 测试实时音频流处理 +- **错误处理测试**: 验证网络异常和 API 错误处理 +- **性能测试**: 测量 TTFW(Time To First Word) 和 TTLW(Time To Last Word) 指标 +- **多语言测试**: 验证不同语言的识别能力(英文和中文) + +#### 关键性能指标 + +Guarder 测试会重点验证以下对话场景的关键指标: + +- **TTFW**: 首次识别结果延迟(通常 < 1000ms) +- **TTLW**: Finalize 到最终结果延迟(通常 < 300ms) +- **识别准确率**: 在不同音质条件下的准确性 +- **连接稳定性**: 长时间会话的连接保持能力 + +## 8. 🌐 端到端测试 + +完成开发后,可以使用 TMan Designer 快速替换 TEN Agent 对话图中的 ASR 扩展,验证在实际对话场景下的效果。 + +### 使用 TMan Designer 替换 ASR 扩展 + +```bash title="Terminal" +# 在 TEN Agent 项目目录下启动 +cd /path/to/your/ten-agent-project +tman designer +``` + +TMan Designer 会打开可视化界面,你可以: + +1. **选择 ASR 节点**: 点击现有的 ASR 扩展积木 +2. **替换为你的扩展**: 选择 `my_asr_extension` +3. **配置参数**: 设置 API Key、语言等参数 +4. **一键应用**: 完成替换并启动测试 + +替换完成后,通过真实对话验证扩展的识别准确性、响应速度和稳定性。 + +## 9. 📊 最佳实践 + +### 配置管理 + +- ✅ 使用 `params` 字典统一管理供应商参数 +- ✅ 通过 `@property` 方法提供类型安全的参数访问 +- ✅ 提供合理的默认值 + +### 错误处理 + +- ✅ 实现指数退避重连机制 +- ✅ 正确处理网络异常和 API 错误 +- ✅ 提供详细的错误日志和状态上报 +- ✅ 优雅处理连接中断和恢复 + +### 性能优化 + +- ✅ 异步处理音频流,避免阻塞 +- ✅ 实现音频缓存和批量发送 +- ✅ 合理管理 WebSocket 连接生命周期 +- ✅ 监控和报告关键性能指标 + +### 日志打印 + +- ✅ 使用 `ten_env.log_debug/info/warn/error` API 打印日志 +- ✅ 通过指定 `category` 让日志更加清晰 +- ✅ 对敏感信息(如 API Key)进行脱敏处理 +- ✅ 在关键节点记录状态变化和错误信息 + +#### 日志分类说明 + +- **KEY_POINT**: 关键节点日志,用于记录重要的配置和状态信息 +- **VENDOR**: 供应商相关日志,包括连接状态、结果处理、错误信息 +- **默认分类**: 业务逻辑相关的一般日志 + +### 错误上报 + +除了日志记录,还需要通过 `self.send_asr_error` API 进行结构化的错误上报 + +#### 错误分类策略 + +**🔥 致命错误 (FATAL_ERROR)** +- 配置解析失败 +- 无效的API密钥 +- 无法建立初始连接 +- 扩展无法继续工作的情况 + +**⚠️ 非致命错误 (NON_FATAL_ERROR)** +- 临时的网络连接问题 +- 供应商服务暂时不可用 +- 音频处理错误 +- 可通过重连恢复的错误 + +#### 供应商信息 (VendorInfo) + +对于供应商返回的错误,应包含详细的供应商信息: + +```python +ModuleErrorVendorInfo( + vendor="deepgram", # 供应商名称 + code="400", # 供应商错误码 + message="Invalid audio format", # 供应商错误消息 +) +``` + +这样可以帮助运维团队快速定位问题来源,区分是扩展问题还是供应商服务问题。 + +#### 敏感信息脱敏 + +```python title="config.py" +def to_json(self, sensitive_handling: bool = False) -> str: + """序列化配置,支持敏感信息脱敏""" + config_dict = self.model_dump() + + if sensitive_handling: + # 脱敏处理敏感字段 + if "api_key" in config_dict.get("params", {}): + api_key = config_dict["params"]["api_key"] + if len(api_key) > 6: + config_dict["params"]["api_key"] = f"{api_key[:2]}...{api_key[-2:]}" + + return json.dumps(config_dict) +``` + +### 调试支持 + +- ✅ 提供音频 Dump 功能用于问题排查 +- ✅ 记录详细的事件和状态变化日志 +- ✅ 支持不同日志级别和分类 +- ✅ 提供性能和质量指标 + +## 10. 🌟 扩展和贡献 + +### 适配其他 ASR 服务 + +基于本教程的框架,你可以参考 TEN Framework 仓库下的其他成品 ASR 扩展: + +1. **Azure Speech Services**: 参考 `azure_asr_python` 扩展的实现 +2. **Google Cloud Speech**: 参考 `google_asr_python` 扩展的实现 +3. **科大讯飞**: 参考 `xfyun_asr_python` 扩展的实现 +4. **其他供应商**: 在 `ai_agents/agents/ten_packages/extension/` 目录下查看更多ASR扩展实现 + +这些成品扩展都遵循相同的架构模式,可以作为适配新ASR服务的参考模板: + +```bash title="参考扩展位置" +ten-framework/ +└── ai_agents/agents/ten_packages/extension/ + ├── azure_asr_python/ # Azure Speech Services + ├── deepgram_asr_python/ # Deepgram ASR + ├── google_asr_python/ # Google Cloud Speech + ├── xfyun_asr_python/ # 科大讯飞语音 + └── ... # 更多ASR扩展 +``` + +### 贡献到社区 + +完成开发后,欢迎将你的 ASR 扩展贡献给 TEN Agent 社区: + +1. **代码规范**: 遵循项目的代码风格和命名约定 +2. **测试覆盖**: 确保单元测试和集成测试通过 +3. **文档完善**: 提供清晰的 README 和配置说明 +4. **性能验证**: 通过 Guarder 测试验证生产可用性 + +### 发布到 TEN Store + +让你的 ASR 扩展被更多开发者使用: + +#### 1. 提交到主仓库 + +```bash title="Terminal" +# 1. Fork TEN Framework 仓库到你的 GitHub 账号 +# 2. 克隆你的 fork 仓库 +git clone https://github.com/your-username/ten-framework.git +cd ten-framework + +# 3. 将你的扩展复制到正确位置 +cp -r /path/to/your/my_asr_extension ai_agents/agents/ten_packages/extension/ + +# 4. 创建功能分支 +git checkout -b feat/add-my-asr-extension + +# 5. 提交更改 +git add ai_agents/agents/ten_packages/extension/my_asr_extension/ +git commit -m "feat: add my_asr_extension for [供应商名称] ASR service" + +# 6. 推送分支 +git push origin feat/add-my-asr-extension +``` + +#### 2. 创建 Pull Request + +1. **打开 GitHub**: 访问你的 fork 仓库页面 +2. **创建 PR**: 点击 "Compare & pull request" +3. **填写信息**: + - 标题: `feat: add my_asr_extension for [供应商名称] ASR service` + - 描述: 详细说明扩展功能、支持的特性和测试情况 +4. **提交 PR**: 等待代码审查和合并 + +#### 3. 代码审查和合并 + +- **自动测试**: CI/CD 系统会自动运行测试 +- **代码审查**: 维护者会审查代码质量和功能 +- **修改建议**: 根据反馈进行必要的修改 +- **合并**: 通过审查后,代码会被合并到 main 分支 + +#### 4. 自动发布到 TEN Store + +一旦你的 PR 被合并到 main 分支: + +- ✅ **自动上传**: 扩展会自动上传到 TEN Store +- ✅ **版本管理**: 系统会自动处理版本号和发布流程 +- ✅ **全球可用**: 你的扩展立即可供全球开发者下载使用 + +#### 5. 使用你的扩展 + +其他开发者现在可以通过以下方式使用你的扩展: + +```bash title="Terminal" +# 安装你的 ASR 扩展 +tman install extension my_asr_extension + +# 或者在项目中声明依赖 +``` + +```json title="manifest.json" +{ + "dependencies": [ + { + "type": "extension", + "name": "my_asr_extension", + "version": "^1.0.0" + } + ] +} +``` + +#### 发布注意事项 + +- **扩展命名**: 使用描述性的名称,避免与现有扩展冲突 +- **版本兼容性**: 确保与当前 TEN Framework 版本兼容 +- **许可证**: 明确扩展的开源许可证 +- **维护承诺**: 准备好维护和更新你的扩展 + +## 📚 总结 + +恭喜你完成了 ASR 扩展开发的完整学习之旅! + +### 🎯 掌握的核心技能 + +- ✅ **项目搭建**: 使用 ASR 模板快速创建项目骨架 +- ✅ **架构设计**: 深入理解 ASR Extension 接口规范和 +- ✅ **功能开发**: 实现连接管理、音频处理、事件处理等核心功能 +- ✅ **高级特性**: 集成重连机制、音频调试、指标上报等高级特性 +- ✅ **质量保证**: 编写单元测试、集成测试、端到端测试的完整覆盖 +- ✅ **生产就绪**: 掌握日志记录、错误处理、性能优化等最佳实践 + +### 🚀 下一步行动 + +现在你可以: + +1. **实践应用**: 选择你熟悉的 ASR 服务商,创建自己的扩展 +2. **深入学习**: 研究其他 TEN 扩展类型(TTS、LLM等)的实现模式 +3. **贡献社区**: 提交 PR 到 TEN Framework,分享你的成果 +4. **生态建设**: 发布到 TEN Store,让更多开发者受益 + + + **开发愉快!** 如果在开发过程中遇到问题,欢迎在 [TEN Framework GitHub](https://github.com/TEN-framework/TEN-Agent) 上提 Issue 或参与讨论。 + + + + 推荐阅读 [TTS 扩展开发指南] 和 [LLM 扩展开发指南],掌握完整的 AI Agent 扩展开发技能。 + diff --git a/content/docs/ten_agent/extension_dev/create_asr_extension.mdx b/content/docs/ten_agent/extension_dev/create_asr_extension.mdx new file mode 100644 index 0000000..c86100d --- /dev/null +++ b/content/docs/ten_agent/extension_dev/create_asr_extension.mdx @@ -0,0 +1,881 @@ +--- +title: Create an ASR Extension +description: Build, test, and publish a production-ready ASR (Automatic Speech Recognition) extension from scratch +--- + +# Create ASR Extension - Complete Guide + +This guide walks you through creating a production-grade ASR (Automatic Speech Recognition) extension from scratch, covering project setup, core development, testing, and publishing. + +## What is an ASR Extension + +An ASR Extension is a standard building block in the TEN Framework that focuses on automatic speech recognition. + +### Core responsibilities + +1. Receive audio stream from upstream modules (typically PCM) +2. Transcribe audio to text in real time +3. Deliver recognized text to downstream modules + +### Where it fits in the pipeline + +ASR plays the key role of converting audio to text in a TEN Agent conversation flow: + +``` +[Upstream] ── audio ──> [ASR Extension] ── text ──> [Downstream] +``` + +Typical upstream modules: +- RTC Extension: pull remote audio stream from an RTC channel +- Audio Capture Extension: capture from microphone or audio files +- Audio Processing Extension: provide preprocessed audio (e.g., denoise, AEC) + +Typical downstream modules: +- LLM Extension: consume text to understand and generate responses +- Translation Extension: translate recognized text across languages +- Intent Recognition Extension: extract intents and key information + +### Real-world scenarios + +Scenario 1: AI Voice Assistant +``` +RTC Extension → ASR Extension → LLM Extension → TTS Extension → RTC Extension +``` +Collect user voice from RTC channel, ASR transcribes to text, LLM generates a reply, TTS converts the reply to speech, and streams it back to RTC. + +Scenario 2: Real-time Speech Translation +``` +RTC Extension → ASR Extension → Translation Extension → TTS Extension → RTC Extension +``` +Recognize Chinese speech to text, translate to English, then synthesize audio and push to RTC. + +Scenario 3: Voice Control +``` +Microphone Extension → ASR Extension → Intent Recognition Extension → Action Executor Extension +``` +Recognize voice commands to text, extract intent, and execute device actions. + +### Why standardize the ASR Extension + +- Plug-and-play: swap among vendors (Deepgram, Azure, Google, etc.) without changing neighbors +- Composable: freely compose with other building blocks to form rich applications +- Maintainable: upgrade and maintain in isolation +- Reusable: develop once, reuse across projects +- Ecosystem-ready: publish to TEN Store for community use + +## What you will learn + +- 🚀 Use the ASR template to scaffold a project +- ⚙️ Understand the ASR Extension interface spec +- 🔧 Implement the core logic of an ASR Extension +- 🧪 Write unit and integration tests +- 📊 Adopt logging and error-handling best practices +- 🌐 Publish your extension to the TEN Store + +## Prerequisites + +- Knowledge: TEN Agent architecture and fundamentals of ASR +- Skills: Python async programming (`asyncio`, `async/await`) +- Environment: develop inside the dev container (tman installed) +- API access: ASR vendor API key for testing + + + The examples use Deepgram as the vendor, but the same design patterns apply to other vendors or local ASR models. + + +## 1. 🚀 Project initialization + +### Create a new extension + +Use TMan's ASR template to create the project skeleton: + +```bash title="Terminal" +# go to the extension folder +cd ten-framework/ai_agents/agents/ten_packages/extension + +# create an ASR extension +tman create extension my_asr_extension --template default_asr_python --template-data class_name_prefix=MyAsr +``` + +After creation you should see: + +```bash title="Output" +Package 'extension:my_asr_extension' created successfully in 'my_asr_extension' in 2 seconds. +``` + +### Install dependencies + +#### Third-party libraries + +Add the Deepgram SDK in `requirements.txt`: + +```text title="requirements.txt" +websockets~=14.0 +pydantic +requests +deepgram-sdk +aiofiles +``` + +#### Install TEN dependencies + +Enter the project and install dependencies: + +```bash title="Terminal" +cd my_asr_extension +tman install --standalone +``` + +This builds the dependency tree from `manifest.json` and installs them into `.ten`. + +## 2. 🏗️ Architecture + +### Project layout + +``` +my_asr_extension/ +├── .vscode/ # VS Code debug configuration +│ └── launch.json # Debug launch config +├── manifest.json # Extension metadata and dependencies +├── property.json # Default runtime properties +├── requirements.txt # Python dependencies +├── extension.py # Main implementation +└── tests/ # Tests + ├── bin/start # Test runner script + ├── test_basic.py # Unit tests + └── configs/ # Test configs +``` + +### ASR Extension interface spec + +ASR Extensions follow the standard interface from TEN Framework. When using the template, the interface inheritance and required API section will be generated automatically. + +#### Manifest configuration + +In `manifest.json`, configure interface and properties properly. + +1) Interface inheritance + +Declare in `api.interface` that this extension inherits the standard ASR interface from `ten_ai_base`: + +```json title="manifest.json" +{ + "api": { + "interface": [ + { + "import_uri": "../../system/ten_ai_base/api/asr-interface.json" + } + ] + } +} +``` + +The `asr-interface.json` defines shared properties for all ASR Extensions, including: +- `dump`: whether to enable audio dump +- `dump_path`: where to store dumped audio + +2) Property declaration + +Besides inheriting the standard interface, each ASR Extension should declare its own vendor-specific properties under `api.property`, especially required fields inside the `params` object, for example: + +```json title="manifest.json" +{ + "api": { + "interface": [ + { "import_uri": "../../system/ten_ai_base/api/asr-interface.json" } + ], + "property": { + "properties": { + "params": { + "type": "object", + "properties": { + "key": { "type": "string" }, + "region": { "type": "string" }, + "language": { "type": "string" } + } + } + } + } + } +} +``` + +Key points: +- Standard properties (`dump`, `dump_path`) come from `asr-interface.json` +- Vendor-specific properties (like `params.key`, `params.language`) are declared under `api.property` + +When using the template, these sections are generated automatically; adjust `params` for your vendor. + +#### Input/Output data formats + +Beyond property declarations, the standard `asr-interface.json` also defines input/output data formats: + +Input: +- PCM audio frames (`pcm_frame`) +- Finalize event (`asr_finalize`) + +Output: +- ASR result (`asr_result`) +- Finalize completed (`asr_finalize_end`) +- Error (`error`) +- Metrics (`metrics`) + +For exact schemas, refer to `asr-interface.json`. + +### Inheritance overview + +```python +AsyncASRBaseExtension # Abstract base class from TEN AI Base + ↓ +MyAsrExtension # Your implementation +``` + +#### What the base class provides + +`AsyncASRBaseExtension` provides a unified framework and out-of-the-box capabilities for all ASR extensions: + +1) Lifecycle management: init, start, and stop hooks +2) Audio frame processing: + - consume frames via an async queue + - apply buffer strategy (discard/keep) based on connection state + - extract and manage `session_id` and `metadata` +3) Finalize handling: receive `asr_finalize` and invoke your `finalize()` +4) Automatic metrics: TTFW, TTLW, and audio actual-send metrics reporting +5) Standard outputs: helpers to send `asr_result`, `error`, `asr_finalize_end`, and `metrics` +6) Session management: auto-generate a per-turn UUID and pass metadata along + +You only need to focus on vendor-specific logic; the framework takes care of the rest. + +#### Abstract methods you must implement + +- `vendor()` +- `start_connection()` +- `stop_connection()` +- `send_audio(frame: AudioFrame, session_id: str | None) -> bool` +- `finalize(session_id: str | None)` +- `is_connected() -> bool` +- `input_audio_sample_rate() -> int` + +#### Optional overrides + +- `input_audio_channels() -> int` (default 1) +- `input_audio_sample_width() -> int` (default 2 bytes / 16-bit PCM) +- `buffer_strategy() -> ASRBufferConfig` (default: discard) +- `audio_actual_send_metrics_interval() -> int` (default 5 seconds) + +#### Utility methods from the base class + +- `send_asr_result(asr_result: ASRResult)` +- `send_asr_error(error: ModuleError, vendor_info: ModuleErrorVendorInfo | None)` +- `send_asr_finalize_end()` +- `send_connect_delay_metrics(connect_delay: int)` +- `send_vendor_metrics(vendor_metrics: dict)` + +## 3. ⚙️ Configuration design + +### Define a config model + +```python title="extension.py" +from pydantic import BaseModel +from typing import Dict, Optional + +class MyAsrConfig(BaseModel): + # All vendor parameters live in params + params: Dict[str, Optional[str]] = {} + + # Audio dump options (standard across ASR extensions) + dump: bool = False + dump_path: Optional[str] = None +``` + +### Read configuration + +```python title="extension.py" +from ten_ai_base.const import LOG_CATEGORY_KEY_POINT, LOG_CATEGORY_VENDOR +from ten_ai_base.message import ModuleError, ModuleErrorCode + +@override +async def on_init(self, ten_env: AsyncTenEnv) -> None: + await super().on_init(ten_env) + + config_json, _ = await ten_env.get_property_to_json("") + try: + self.config = MyAsrConfig.model_validate_json(config_json) + + ten_env.log_info( + f"config: {self.config.to_json(sensitive_handling=True)}", + category=LOG_CATEGORY_KEY_POINT, + ) + + if self.config.dump: + dump_file_path = os.path.join(self.config.dump_path, DUMP_FILE_NAME) + self.audio_dumper = Dumper(dump_file_path) + + except Exception as e: + ten_env.log_error(f"invalid property: {e}", category=LOG_CATEGORY_KEY_POINT) + self.config = MyAsrConfig.model_validate_json("{}") + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.FATAL_ERROR.value, + message=str(e), + ), + ) +``` + +### Sensitive information masking + +```python title="extension.py" +from ten_ai_base.utils import encrypt + +class MyAsrConfig(BaseModel): + params: Dict[str, Optional[str]] = {} + dump: bool = False + dump_path: Optional[str] = None + + def to_json(self, sensitive_handling: bool = False) -> str: + if not sensitive_handling: + return self.model_dump_json() + + config = self.model_copy(deep=True) + if config.params: + encrypted_params = {} + for key, value in config.params.items(): + if (key in ["api_key", "key", "token", "secret", "password"] + and isinstance(value, str) and value): + encrypted_params[key] = encrypt(value) + else: + encrypted_params[key] = value + config.params = encrypted_params + + return config.model_dump_json() +``` + +### Default properties + +```json title="property.json" +{ + "params": { + "url": "wss://api.deepgram.com/v1/listen", + "api_key": "your_deepgram_api_key_here", + "language": "en", + "model": "nova-2", + "sample_rate": "16000", + "punctuate": "true", + "smart_format": "true", + "interim_results": "true" + }, + "dump": false, + "dump_path": "/tmp/asr_audio_dump" +} +``` + +## 4. 🔧 Core implementation + +### Basic methods + +```python title="extension.py" +import asyncio +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, +) +from ten_ai_base.asr import ASRResult + +class MyAsrExtension(AsyncASRBaseExtension): + def __init__(self, name: str): + super().__init__(name) + self.config: MyAsrConfig = MyAsrConfig() + self.deepgram_client: Optional[AsyncListenWebSocketClient] = None + self.is_connected_flag: bool = False + self.last_finalize_timestamp: float = 0.0 + + @override + def vendor(self) -> str: + return "deepgram" + + @override + def input_audio_sample_rate(self) -> int: + return int(self.config.params.get("sample_rate", 16000) or 16000) + + @override + def is_connected(self) -> bool: + return self.is_connected_flag +``` + +### Connection management + +#### Start connection + +`start_connection` runs automatically after initialization. Log vendor errors and report them with `send_asr_error`. Implement retry logic if applicable. + +```python title="extension.py" +@override +async def start_connection(self) -> None: + try: + await self.stop_connection() + + config = DeepgramClientOptions(api_key=self.config.params.get("api_key", "") or "") + deepgram = DeepgramClient(config=config) + self.deepgram_client = deepgram.listen.live.v("1") + + await self._register_deepgram_events() + + options = LiveOptions( + model=self.config.params.get("model", "nova-2") or "nova-2", + language=self.config.params.get("language", "en") or "en", + sample_rate=self.config.params.get("sample_rate", 16000) or 16000, + ) + + for key, value in self.config.params.items(): + if key not in ["url", "api_key", "language", "model", "sample_rate"] and value: + setattr(options, key, value == "true" if value in ["true", "false"] else value) + + await self.deepgram_client.start(options) + + except Exception as e: + self.ten_env.log_error(f"failed to connect to deepgram: {e}", category=LOG_CATEGORY_VENDOR) + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.FATAL_ERROR.value, + message=str(e), + ), + ) +``` + +#### Stop connection + +```python title="extension.py" +@override +async def stop_connection(self) -> None: + if self.deepgram_client: + await self.deepgram_client.finish() + self.deepgram_client = None + self.is_connected_flag = False +``` + +### Handle audio + +The base class decides whether to forward or buffer/drop frames based on `is_connected`. + +```python title="extension.py" +@override +async def send_audio(self, audio_frame: AudioFrame) -> bool: + if not self.is_connected() or not self.deepgram_client: + return False + + try: + audio_buf = audio_frame.get_buf() + if not audio_buf: + return False + + await self.deepgram_client.send(bytes(audio_buf)) + return True + except Exception as e: + self.ten_env.log_error(f"Failed to send audio: {e}", category="vendor") + return False +``` + +#### Configure buffering strategy + +The base class calls `buffer_strategy()` to determine how to handle frames when disconnected: + +- Discard mode (ASRBufferConfigModeDiscard): drop frames when disconnected +- Keep mode (ASRBufferConfigModeKeep): cache frames and send them after reconnection + +```python title="extension.py" +from ten_ai_base.asr import ASRBufferConfig, ASRBufferConfigModeKeep + +@override +def buffer_strategy(self) -> ASRBufferConfig: + return ASRBufferConfig( + mode=ASRBufferConfigModeKeep(byte_limit=10 * 1024 * 1024) + ) +``` + +#### Why we recommend Keep mode + +If frames are dropped during disconnection, vendor-side timestamps are computed relative to the audio they actually received, which will be smaller than the real timeline. Downstream components that rely on accurate timestamps can then behave incorrectly. + +Example: +1) 0–10s sent normally +2) 10–15s dropped due to disconnection +3) 15–20s sent after reconnection + +The vendor actually receives 15 seconds of audio (0–10 + 15–20). When producing results for the last 5 seconds, it thinks the timestamps are 10–15s, while the real time is 15–20s (a 5-second drift). Keep mode avoids this by caching frames and sending every frame to the vendor, preserving accurate timestamps. + +#### Implement finalize + +```python title="extension.py" +@override +async def finalize(self) -> None: + """Trigger final results quickly after VAD detects the end of speech.""" + if self.deepgram_client: + self.last_finalize_timestamp = asyncio.get_event_loop().time() * 1000 + await self.deepgram_client.finalize() + await self.send_asr_finalize_end() +``` + +### Vendor event handling + +Register event handlers, log connection changes, transform vendor results to standard `ASRResult`, send errors, and trigger reconnection. + +```python title="extension.py" +async def _register_deepgram_events(self) -> None: + if not self.deepgram_client: + return + self.deepgram_client.on(LiveTranscriptionEvents.Open, self._on_open) + self.deepgram_client.on(LiveTranscriptionEvents.Close, self._on_close) + self.deepgram_client.on(LiveTranscriptionEvents.Transcript, self._on_transcript) + self.deepgram_client.on(LiveTranscriptionEvents.Error, self._on_error) +``` + +#### Errors and reconnection + +Log vendor errors, report them with `send_asr_error`, then call `_handle_reconnect()`. + +```python title="extension.py" +async def _on_error(self, *args, **kwargs) -> None: + error = args[1] if len(args) > 1 else None + if not error: + return + self.ten_env.log_error(f"vendor_error: deepgram error: {error}", category=LOG_CATEGORY_VENDOR) + await self.send_asr_error( + ModuleError( + module=MODULE_NAME_ASR, + code=ModuleErrorCode.NON_FATAL_ERROR.value, + message=f"Vendor error: {str(error)}", + ), + ModuleErrorVendorInfo( + vendor="deepgram", + code=getattr(error, 'code', 'unknown'), + message=str(error), + ) + ) + await self._handle_reconnect() +``` + + + See the Advanced section for how to use `ReconnectManager` to implement intelligent reconnection. + + +## 5. 🚀 Advanced + +### Reconnection strategy + +When the vendor connection breaks or errors occur, use `ReconnectManager` to implement retry with exponential backoff. + +How to use: +1) Initialize `ReconnectManager` in your constructor: `self.reconnect_manager = ReconnectManager(max_attempts=5, base_delay=0.5)` +2) On successful open (`_on_open`), call `self.reconnect_manager.mark_connection_successful()` +3) Implement `_handle_reconnect()` to: + - check `can_retry()` + - call `handle_reconnect(connect_func=self.start_connection)` + - log success/failure and send fatal error when max attempts reached +4) Trigger `_handle_reconnect()` from `_on_close` and `_on_error` + + + For a reference implementation, check `reconnect_manager.py` in either `azure_asr_python` or `deepgram_asr_python`. + + +### Audio debugging (Dump) + +Integrate optional dumping to help debug audio issues. + +```python title="extension.py" +import os +from ten_ai_base.dumper import Dumper + +DUMP_FILE_NAME = "my_asr_in.pcm" + +class MyAsrExtension(AsyncASRBaseExtension): + @override + async def on_init(self, ten_env: AsyncTenEnv) -> None: + await super().on_init(ten_env) + if self.config.dump: + dump_file_path = os.path.join(self.config.dump_path, DUMP_FILE_NAME) + self.audio_dumper = Dumper(dump_file_path) + await self.audio_dumper.start() + + @override + async def on_deinit(self, ten_env: AsyncTenEnv) -> None: + await super().on_deinit(ten_env) + if self.audio_dumper: + await self.audio_dumper.stop() + self.audio_dumper = None +``` + +## 6. 🧪 Unit testing + +Why mocking: +- CI friendly (no vendor quota) +- Cost control +- Stability (no flaky vendor connections) +- Speed +- Full control of edge cases + +Recommended coverage: +1) Config management (valid/invalid, masking) +2) Audio processing (send, transform, timestamps) +3) Connection management (connect, reconnect, logs) +4) Finalize flow +5) Error handling (fatal vs non-fatal, vendor info) +6) Audio dump +7) Metrics (TTFW, TTLW, vendor metrics) + +Run tests: + +```bash title="Terminal" +cd my_asr_extension +./tests/bin/start +``` + +### Mock example (tests/mock.py) + +```python title="tests/mock.py" +import pytest +from unittest.mock import MagicMock, patch +from types import SimpleNamespace + +@pytest.fixture(scope="function") +def patch_deepgram_ws(): + """Mock Deepgram WebSocket client""" + with patch("ten_packages.extension.my_asr_extension.extension.AsyncListenWebSocketClient") as mock_client: + # Create mock instance + mock_instance = MagicMock() + mock_client.return_value = mock_instance + + # Store event handlers + event_handlers = {} + + def mock_on(event, handler): + event_handlers[event] = handler + + mock_instance.on = mock_on + mock_instance.start = MagicMock() + mock_instance.send = MagicMock() + mock_instance.finish = MagicMock() + mock_instance.finalize = MagicMock() + + # Helpers to trigger events + def trigger_open(): + if 'open' in event_handlers: + event_handlers['open']() + + def trigger_transcript(text, is_final=False): + if 'transcript' in event_handlers: + # Emulate Deepgram response structure + mock_result = SimpleNamespace() + mock_result.channel = SimpleNamespace() + mock_result.channel.alternatives = [SimpleNamespace()] + mock_result.channel.alternatives[0].transcript = text + mock_result.is_final = is_final + mock_result.start = 0.0 + mock_result.duration = 1.0 + event_handlers['transcript'](None, mock_result) + + mock_instance.trigger_open = trigger_open + mock_instance.trigger_transcript = trigger_transcript + + yield mock_instance +``` + +### Debugging with VS Code + +The template includes `.vscode/launch.json` for out-of-the-box debugging. + +```json title=".vscode/launch.json" +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Test Extension", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/tests/bin/start", + "args": [], + "console": "integratedTerminal", + "cwd": "${workspaceFolder}", + "env": { + "PYTHONPATH": "${workspaceFolder}:${workspaceFolder}/.ten/app/ten_packages/system/ten_runtime_python/lib:${workspaceFolder}/.ten/app/ten_packages/system/ten_runtime_python/interface:${workspaceFolder}/.ten/app/ten_packages/system/ten_ai_base/interface" + } + } + ] +} +``` + +#### Debug a specific test + +Pass arguments to run a single test: + +```json title=".vscode/launch.json" +{ + "args": [ + "tests/test_basic.py::test_asr_basic_functionality", + "-v" + ] +} +``` + +#### Environment variable debugging + +```json title=".vscode/launch.json" +{ + "env": { + "PYTHONPATH": "...", + "DEEPGRAM_API_KEY": "your_real_api_key_here" + } +} +``` + +## 7. 🔗 Integration testing (Guarder) + +Prepare `.env` with real keys, create configs under `tests/configs/`, and run: + +```bash title=".env" +# Deepgram ASR API Key +DEEPGRAM_API_KEY=your_real_deepgram_api_key_here +``` + +Example property config: + +```json title="tests/configs/property_en.json" +{ + "params": { + "api_key": "${env:DEEPGRAM_API_KEY}", + "language": "en-US" + } +} +``` + +```bash title="Terminal" +cd ai_agents +task asr-guarder-test EXTENSION=my_asr_extension +``` + +Focus metrics: +- TTFW < 1000ms (typical) +- TTLW < 300ms (typical) +- Accuracy under varied audio quality +- Long-session stability + +## 8. 🌐 End-to-end testing + +Use TMan Designer to replace the ASR node in a real conversation graph, configure parameters, and verify accuracy, latency, and stability. + +```bash title="Terminal" +# In your TEN Agent project +cd /path/to/your/ten-agent-project +tman designer +``` + +TMan Designer opens a visual UI where you can: +1. Select the ASR node +2. Replace it with `my_asr_extension` +3. Configure parameters (API Key, language, etc.) +4. Apply and start testing + +## 9. 📊 Best practices + +Config: +- Keep vendor params in a single `params` dict +- Provide safe accessors and defaults + +Errors: +- Exponential backoff for reconnection +- Clear logging and structured reporting + +Performance: +- Async audio processing +- Audio buffering and batching +- Proper WebSocket lifecycle management +- Monitor and report key metrics + +Logging: +- Use `ten_env.log_debug/info/warn/error` +- Use categories to organize logs +- Mask sensitive data (e.g., API keys) + +#### Log categories + +- KEY_POINT: important configuration and state logs +- VENDOR: vendor-related logs (connection status, results, errors) +- Default: general business logic logs + +## 10. 🌟 Extend and contribute + +See other ASR extensions in `ai_agents/agents/ten_packages/extension/` such as `azure_asr_python`, `deepgram_asr_python`, `google_asr_python`, and `xfyun_asr_python`. + +```bash title="Reference locations" +ten-framework/ +└── ai_agents/agents/ten_packages/extension/ + ├── azure_asr_python/ # Azure Speech Services + ├── deepgram_asr_python/ # Deepgram ASR + ├── google_asr_python/ # Google Cloud Speech + ├── xfyun_asr_python/ # iFlytek (XFYun) + └── ... # More ASR extensions +``` + +### Contribute to the community + +1. Code style: follow project conventions +2. Test coverage: ensure unit and integration tests pass +3. Documentation: provide clear README and configuration notes +4. Performance validation: pass Guarder tests for production readiness + +### Publish to TEN Store + +1) Fork and clone TEN Framework +2) Copy your extension into `ai_agents/agents/ten_packages/extension/` +3) Create a branch, commit, and open a PR +4) Once merged into `main`, it will be uploaded to TEN Store automatically + +```bash title="Terminal" +git clone https://github.com/your-username/ten-framework.git +cd ten-framework +cp -r /path/to/your/my_asr_extension ai_agents/agents/ten_packages/extension/ +git checkout -b feat/add-my-asr-extension +git add ai_agents/agents/ten_packages/extension/my_asr_extension/ +git commit -m "feat: add my_asr_extension for [Vendor] ASR service" +git push origin feat/add-my-asr-extension +``` + +Open a PR on GitHub and provide a clear description of features and tests. + +#### Use your extension + +```bash title="Terminal" +# Install your ASR extension +tman install extension my_asr_extension +``` + +Or declare it as a dependency: + +```json title="manifest.json" +{ + "dependencies": [ + { + "type": "extension", + "name": "my_asr_extension", + "version": "^1.0.0" + } + ] +} +``` + +## 🎯 Summary + +You learned how to scaffold, implement, test, and publish a production-ready ASR Extension, and how to integrate advanced features such as reconnection and audio dump. + + + Happy hacking! If you run into issues, open an issue on the TEN Framework GitHub. + + + + Consider reading the TTS and LLM extension development guides to build a complete AI Agent skill set. + + + diff --git a/content/docs/ten_agent/extension_dev/meta.json b/content/docs/ten_agent/extension_dev/meta.json index 4fc3a45..23347b8 100644 --- a/content/docs/ten_agent/extension_dev/meta.json +++ b/content/docs/ten_agent/extension_dev/meta.json @@ -1,4 +1,4 @@ { "title": "Extension Development", - "pages": ["create_a_hello_world_extension"] + "pages": ["create_a_hello_world_extension", "create_asr_extension"] }