本文档旨在为开发者提供一套标准的自定义数据处理算子开发规范。通过遵循本指南,您可以开发出能够无缝集成到数据处理系统中的高质量算子。
一个标准的算子开发包(Package)应包含以下核心文件。请确保文件命名准确,以便系统正确识别。
operator_package/
├── __init__.py # [必要] 算子注册入口,用于将算子注册到全局工厂
├── metadata.yml # [必要] 算子元数据、UI 参数定义及资源配置
├── process.py # [必要] 算子核心逻辑代码
├── requirements.txt # [可选] 算子运行所需的第三方 Python 依赖
└── README.md # [可选] 算子功能说明文档
metadata.yml 定义了算子在系统中的“身份”、前端显示的配置组件以及运行时资源限制。
| 字段 | 说明 | 示例 |
|---|---|---|
name |
算子显示名称 | 测试算子 |
description |
算子描述 | 这是一个测试算子。 |
language |
算子使用的语言,当前仅支持python | python |
raw_id |
关键字段,必须与 process.py 中的类名完全一致 |
TestMapper |
version |
语义化版本号 | 1.0.0 |
vendor |
厂商 | datamate |
modal |
支持的数据模态 (text/image/audio/video) | text |
inputs |
输入的数据模态 (text/image/audio/video) | text |
outputs |
输出的数据模态 (text/image/audio/video) | text |
定义算子功能分类,支持清洗与标注。
types:
- 'cleaning'
- 'annotation'定义算子当前版本较上版本更新内容。
release:
- '首次发布'
- '支持基本处理操作'定义算子运行时的资源配额及性能指标参考。
runtime:
memory: 10485760 # 内存 单位bytes
cpu: 0.05 # CPU 核心数
gpu: 0.1 # GPU 卡数
npu: 0.1 # NPU 卡数
storage: 10MB # 存储空间
metrics: # 算子性能参考指标
- name: '吞吐量'
metric: '20 images/sec'
- name: '准确率'
metric: '99.5%'通过 settings 字段,开发者可以自定义用户在前端界面配置算子时的交互组件。系统支持以下类型:
- Slider (滑动条) - 用于数值范围调整
sliderParam: # 参数的唯一标识符,process.py 中通过该参数获取值
name: '参数展示名称' # 界面上显示给用户的参数标题
description: '参数展示描述' # 用户鼠标悬停时显示的详细功能说明或帮助文本
type: 'slider' # 组件类型
defaultVal: 0.5 # 算子加载时的初始值,必须位于 min 和 max 之间
required: false # 是否必填
min: 0 # [下限] 滑动条允许调整的最小值
max: 1 # [上限] 滑动条允许调整的最大值
step: 0.1 # [步长] 每次拖动的增量 (例如 0.1 代表保留一位小数,1 代表整数)- Switch (开关) - 用于布尔值控制
switchParam:
name: '参数展示名称'
description: '参数展示描述'
type: 'switch'
defaultVal: 'true' # 注意 yaml 中布尔值建议使用引号或标准写法
required: false # 是否必须参数
checkedLabel: '选中' # 选中时的展示
unCheckedLabel: '未选中' # 未选中时的展示- Select / Radio (下拉 / 单选) - 用于枚举选项
selectParam:
name: '参数展示名称'
description: '参数展示描述'
type: 'select' # 或 'radio'
defaultVal: 'option1' # 默认后端传递值,为options其中一个选项
required: false
options:
- label: '选项1' # 前端显示标签
value: 'option1' # 后端传递值
- label: '选项2'
value: 'option2'- Range (范围区间)
rangeParam:
name: '参数展示名称'
description: '参数展示描述'
type: 'range'
properties:
- name: 'rangeLeft' # 范围下限
type: 'inputNumber'
defaultVal: 100
min: 0
max: 10000
step: 1
- name: 'rangeRight' # 范围上限
type: 'inputNumber'
defaultVal: 8000
min: 0
max: 10000
step: 1- Checkbox (多选)
checkboxParam:
name: '参数展示名称'
description: '参数展示描述'
type: 'checkbox'
defaultVal: 'option1,option2' # 多个值用逗号分隔
required: false
options:
- label: '选项1'
value: 'option1'
- label: '选项2'
value: 'option2'
- Input (文本输入)
inputParam:
name: '参数展示名称'
description: '参数展示描述'
type: 'input'
defaultVal: '默认值'
required: falseprocess.py 是算子的执行主体。开发者需继承基础类并实现数据处理逻辑。
- 继承基类:必须从
datamate.core.base_op继承Mapper或Filter。 - 类名一致性:Python 类名建议与后续
metadata.yml中的raw_id保持一致。 - Execute 方法:必须实现
execute方法,接收sample(字典) 并返回处理后的字典。
from typing import Dict, Any
from datamate.core.base_op import Mapper
class YourOperatorName(Mapper):
"""
算子类名建议使用驼峰命名法定义,例如 TestMapper
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.slider_param = float(kwargs.get("sliderParam", 0.5))
self.switch_param = kwargs.get('switchParam', False)
self.select_param = kwargs.get('selectParam', '')
self.radio_param = kwargs.get('radioParam', '')
self.range_param = kwargs.get('rangeParam', [0, 0])
self.checkbox_param = kwargs.get('checkboxParam', [])
self.input_param = kwargs.get('inputParam', '').strip()
def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]:
"""
核心处理逻辑
:param sample: 输入的数据样本,通常包含 text_key 等字段
:return: 处理后的数据样本
"""
# 示例:获取文本并进行修改
# input_text = sample['text']
# processed_text = do_something(input_text)
# sample['text'] = processed_text
return sample其中,sample字段包含如下参数:
{
"text": "源文件读取的文本内容",
"data": "源文件读取的二进制数据(针对图片等)",
"fileName": "源文件名",
"fileType": "源文件类型(扩展名)",
"fileId": "源文件ID",
"filePath": "源文件路径",
"fileSize": "源文件大小",
"export_path": "目标文件导出路径信息",
"ext_params": "额外扩展参数",
"target_type": "目标文件类型"
}__init__.py 用于将开发好的算子注册到系统中。
使用 OPERATORS.register_module 方法进行注册。
- module_name: 对应
metadata.yml中的raw_id及 Python 类名。 - module_path: 指向
process.py的引用路径(注意路径层级)。
# -*- coding: utf-8 -*-
from datamate.core.base_op import OPERATORS
# 假设 process.py 位于 operator_package 目录下
OPERATORS.register_module(
module_name='YourOperatorName',
module_path="ops.user.operator_package.process"
)- 依赖管理:如果算子依赖非标准库(如 pandas, numpy),请在
requirements.txt中列出。 - 异常处理:在
process.py中建议添加适当的 try-catch 逻辑,避免单条数据异常导致整个任务崩溃。 - 数据类型:在
metadata.yml中定义的参数类型(如 slider 返回 float,input 返回 string),在 Python 代码中使用时需注意类型转换。
开发完成后,需将所有文件打包为一个压缩包进行上传。包名必须严格遵循注册路径规范。
- 文件完整性:压缩包内必须包含
__init__.py,metadata.yml,process.py及相关依赖。 - 命名一致性(重要):压缩包的文件名(不含后缀)必须与
__init__.py中module_path所指定的包目录名保持一致。
假设您在 __init__.py 中的注册代码如下:
OPERATORS.register_module(
module_name='TestMapper',
# 这里 ops.user.my_custom_op.process 中的 'my_custom_op' 为包目录名
module_path="ops.user.my_custom_op.process"
)则您的压缩包名称必须命名为:
my_custom_op.zip(或 .tar)
系统解压后将基于此名称构建导入路径,名称不一致将导致 ModuleNotFoundError。