diff --git a/ticdc/ticdc-architecture.md b/ticdc/ticdc-architecture.md index ec35a9f35781..323cbc6ed221 100644 --- a/ticdc/ticdc-architecture.md +++ b/ticdc/ticdc-architecture.md @@ -55,13 +55,49 @@ TiCDC 新架构通过将整体架构拆分成有状态和无状态的两部分 ## 新功能介绍 -新架构支持为 MySQL Sink 启用**表级任务拆分**。你可以通过在 Changefeed 配置中设置 `scheduler.enable-table-across-nodes = true` 来启用该功能。 +新架构支持为 MySQL Sink 和 Storage Sink 启用**表级任务拆分**。你可以通过在 Changefeed 配置中设置 `scheduler.enable-table-across-nodes = true` 来启用该功能。 -启用后,当**有且仅有一个主键或非空唯一键**的表满足以下任一条件时,TiCDC 会自动将其拆分并分发到多个节点并行执行同步,从而提升同步效率与资源利用率: +对于 **MySQL Sink** 来说,**有且仅有一个主键或非空唯一键**的表满足以下任一条件时,TiCDC 会自动将其拆分并分发到多个节点并行执行同步,从而提升同步效率与资源利用率: - 表的 Region 数超过配置的阈值(默认 `100000`,可通过 `scheduler.region-threshold` 调整)。 - 表的写入流量超过配置的阈值(默认未开启,可通过 `scheduler.write-key-threshold` 设置)。 +对于 **Storage Sink** 来说,文件名会变为 CDC\_{uuid}\_{num}.{extension} 的形式,Index 文件名会变为 CDC\_{uuid}.index 的形式。 + +- 数据变更记录路径 + ``` + {scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC_{uuid}_{num}.{extension} + ``` + +- Index 文件路径 + ``` + {scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/meta/CDC_{uuid}.index + ``` + + 在处理文件时,可能会遇到某个文件还没有写入完成就进行读取的情况,这会导致部分数据没有成功读取。我们需要先读取 Index 文件来获取可以进行处理的文件来避免这种情况发生。消费逻辑为: + + - 读取目录下的 meta/CDC.index 文件,获取当前已经完成写入的文件名。 + - 依次处理文件序号小于等于该文件名的 DML 事件。 + + ``` + ├── metadata + └── test + ├── tbl_1 + │ ├── 437752935075545091 + │ │ ├── CDC_11_000001.json + │ │ ├── CDC_22_000001.json + │ │ └── meta + │ │ ├── CDC_11.index + │ │ └── CDC_22.index + │ ├── 437752935075546092 + │ │ ├── CDC_33_000001.json + │ │ ├── CDC_44_000001.json + │ │ └── meta + │ │ ├── CDC_33.index + │ │ └── CDC_44.index + ``` + 在 `{schema}/{table}/{table-version-separator}/` 目录下会存在 uuid 不同,序号相同的文件。消费时,读取对应的 Index 文件来获取已经完成写入的文件名,依次处理文件序号小于等于对应文件名的 DML 事件,对这些文件按 DML 事件的 commit-ts 进行排序后统一进行处理。 + ## 兼容性说明 ### DDL 进度表 diff --git a/ticdc/ticdc-storage-consumer-dev-guide.md b/ticdc/ticdc-storage-consumer-dev-guide.md index f77435611ac7..b585cd760fc5 100644 --- a/ticdc/ticdc-storage-consumer-dev-guide.md +++ b/ticdc/ticdc-storage-consumer-dev-guide.md @@ -182,3 +182,12 @@ func (tc *TableVersionConsumer) ExecuteDML() {} 在处理完 DDL 事件后,可以在 `{schema}/{table}/{table-version-separator}/` 目录下,根据具体的文件格式(CSV 或 Canal-JSON)并按照文件序号依次处理 DML 事件。 因为 TiCDC 提供 At Least Once 语义,可能出现重复发送数据的情况,所以需要在消费程序中比较数据事件的 commit ts 和 consumer checkpoint,并在 commit ts 小于 consumer checkpoint 的情况下进行去重处理。 + +在处理文件时,可能会遇到某个文件还没有写入完成就进行读取的情况,这会导致部分数据没有成功读取。我们需要先读取 Index 文件来获取可以进行处理的文件来避免这种情况发生。消费逻辑为: + +- 读取目录下的 meta/CDC.index 文件,获取当前已经完成写入的文件名。 +- 依次处理文件序号小于等于该文件名的 DML 事件。 + +> **注意:** +> +> 在新架构下开启 enable-table-across-nodes 后,文件名变为 CDC\_{uuid}\_000001.json 的形式。此时,该目录下会存在 uuid 不同,序号相同的文件。消费时,读取对应的 Index 文件来获取已经完成写入的文件名,依次处理文件序号小于等于对应文件名的 DML 事件,对这些文件按 DML 事件的 commit-ts 进行排序后统一进行处理。