Skip to content

Commit 225f73b

Browse files
yangmu.0722Stefanietry
andcommitted
[core] Support chain tbl on batch mode
Co-authored-by: zhoufa <[email protected]>
1 parent a2acde5 commit 225f73b

File tree

50 files changed

+2452
-66
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2452
-66
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
---
2+
title: "Chain Table"
3+
weight: 6
4+
type: docs
5+
aliases:
6+
- /primary-key-table/chain-table.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Chain Table
28+
29+
Chain table is a new feature for Primary Key tables that revolutionizes how you process incremental data.
30+
Consider such a type of scenario: store a full set of data periodically, such as every day, but most of it
31+
is redundant, with only a small amount of changed data every day. ODS Binlog Dump is just such a typical
32+
scene.
33+
34+
Taking a daily binlog dump job as an example, the previous day's full data and the new incremental data
35+
are merged to generate a new full dataset by a daily task. There are two obvious drawbacks in this way:
36+
* Full computation: Merge operation includes all data, and it will involve shuffle, which results in poor performance.
37+
* Full storage: Store a full set of data every day, and the changed data usually accounts for a very small proportion (e.g., 1%).
38+
39+
Paimon can solve this problem by consuming the changed data directly and merge on read, in this way, the
40+
aforementioned full computation and storage can be optimized into incremental mode.
41+
* Incremental computation: The offline ETL daily job only needs to consume the changed data of the current day and do not require merging all data.
42+
* Incremental Storage: Only store the changed data each day, and asynchronously compact it periodically (e.g., weekly) to build a global chain table within the lifecycle.
43+
{{< img src="/img/chain-table.png">}}
44+
45+
Based on the regular paimon table, snapshot and delta branches are introduced to describe the full and incremental data
46+
respectively. When writing, you can specify branch to write full or incremental data, and when reading, the
47+
appropriate reading strategy is automatically selected based on the reading mode, such as full, incremental,
48+
or hybrid reading.
49+
50+
To enable chain table, you must config `chain-table.enabled` to true in the table options when creating the
51+
table, and the snapshot and delta branch need to be created as well. Consider an example via Spark SQL:
52+
53+
```sql
54+
CREATE TABLE default.t (
55+
`t1` string ,
56+
`t2` string ,
57+
`t3` string
58+
) PARTITIONED BY (`date` string)
59+
TBLPROPERTIES (
60+
'chain-table.enabled' = 'true'
61+
);
62+
63+
CALL sys.create_branch('default.t', 'snapshot');
64+
65+
CALL sys.create_branch('default.t', 'delta');
66+
67+
ALTER TABLE default.t SET tblproperties
68+
('scan.fallback-snapshot-branch' = 'snapshot',
69+
'scan.fallback-delta-branch' = 'delta');
70+
71+
ALTER TABLE `default`.`t$branch_snapshot` SET tblproperties
72+
('scan.fallback-snapshot-branch' = 'snapshot',
73+
'scan.fallback-delta-branch' = 'delta');
74+
75+
ALTER TABLE `default`.`t$branch_delta` SET tblproperties
76+
('scan.fallback-snapshot-branch' = 'snapshot',
77+
'scan.fallback-delta-branch' = 'delta');
78+
```
79+
80+
Notice that:
81+
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
82+
- Chain table should ensure that the schema of each branch is consistent.
83+
- Only spark support now, flink will be supported later.
84+
- Chain compact is not supported for now, and it will be supported later.
85+
86+
After creating a chain table, you can read and write data in the following ways.
87+
88+
- Full Write: Write data to t$branch_snapshot.
89+
```sql
90+
insert overwrite `default`.`t$branch_snapshot` partition (date = '20250810')
91+
values ('1', '1', '1');
92+
```
93+
94+
- Incremental Write: Write data to t$branch_delta.
95+
```sql
96+
insert overwrite `default`.`t$branch_delta` partition (date = '20250811')
97+
values ('2', '1', '1');
98+
```
99+
100+
- Full Query: If the snapshot branch has full partition, read it directly; otherwise, read on chain merge mode.
101+
```sql
102+
select t1, t2, t3 from default.t where date = '20250811'
103+
```
104+
you will get the following result:
105+
```text
106+
+---+----+-----+
107+
| t1| t2| t3|
108+
+---+----+-----+
109+
| 1 | 1| 1 |
110+
| 2 | 1| 1 |
111+
+---+----+-----+
112+
```
113+
114+
- Incremental Query: Read the incremental partition from t$branch_delta
115+
```sql
116+
select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
117+
```
118+
you will get the following result:
119+
```text
120+
+---+----+-----+
121+
| t1| t2| t3|
122+
+---+----+-----+
123+
| 2 | 1| 1 |
124+
+---+----+-----+
125+
```
126+
127+
- Hybrid Query: Read both full and incremental data simultaneously.
128+
```sql
129+
select t1, t2, t3 from default.t where date = '20250811'
130+
union all
131+
select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
132+
```
133+
you will get the following result:
134+
```text
135+
+---+----+-----+
136+
| t1| t2| t3|
137+
+---+----+-----+
138+
| 1 | 1| 1 |
139+
| 2 | 1| 1 |
140+
| 2 | 1| 1 |
141+
+---+----+-----+
142+
```

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@
9898
<td>MemorySize</td>
9999
<td>Memory page size for caching.</td>
100100
</tr>
101+
<tr>
102+
<td><h5>chain-table.enabled</h5></td>
103+
<td style="word-wrap: break-word;">false</td>
104+
<td>Boolean</td>
105+
<td>Specify chain table enable.</td>
106+
</tr>
101107
<tr>
102108
<td><h5>changelog-file.compression</h5></td>
103109
<td style="word-wrap: break-word;">(none)</td>
@@ -1007,6 +1013,18 @@
10071013
<td>String</td>
10081014
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
10091015
</tr>
1016+
<tr>
1017+
<td><h5>scan.fallback-delta-branch</h5></td>
1018+
<td style="word-wrap: break-word;">(none)</td>
1019+
<td>String</td>
1020+
<td>When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, the reader will try to get this partition from chain snapshot and delta branch together.</td>
1021+
</tr>
1022+
<tr>
1023+
<td><h5>scan.fallback-snapshot-branch</h5></td>
1024+
<td style="word-wrap: break-word;">(none)</td>
1025+
<td>String</td>
1026+
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
1027+
</tr>
10101028
<tr>
10111029
<td><h5>scan.file-creation-time-millis</h5></td>
10121030
<td style="word-wrap: break-word;">(none)</td>

docs/static/img/chain-table.png

201 KB
Loading

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,35 @@ public InlineElement getDescription() {
226226
public static final ConfigOption<String> BRANCH =
227227
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
228228

229+
public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
230+
key("chain-table.enabled")
231+
.booleanType()
232+
.defaultValue(false)
233+
.withDescription("Specify chain table enable.");
234+
235+
public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
236+
key("scan.fallback-snapshot-branch")
237+
.stringType()
238+
.noDefaultValue()
239+
.withDescription(
240+
"When a batch job queries from a chain table, if a partition does not exist in the main branch, "
241+
+ "the reader will try to get this partition from chain snapshot branch.");
242+
243+
public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
244+
key("scan.fallback-delta-branch")
245+
.stringType()
246+
.noDefaultValue()
247+
.withDescription(
248+
"When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, "
249+
+ "the reader will try to get this partition from chain snapshot and delta branch together.");
250+
251+
@ExcludeFromDocumentation("Internal use only")
252+
public static final ConfigOption<ChainBranchReadMode> CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE =
253+
key("chain-table.branch.internal.read.mode")
254+
.enumType(ChainBranchReadMode.class)
255+
.defaultValue(ChainBranchReadMode.DEFAULT)
256+
.withDescription("Chain query type.");
257+
229258
public static final String FILE_FORMAT_ORC = "orc";
230259
public static final String FILE_FORMAT_AVRO = "avro";
231260
public static final String FILE_FORMAT_PARQUET = "parquet";
@@ -3137,6 +3166,22 @@ public int lookupMergeRecordsThreshold() {
31373166
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
31383167
}
31393168

3169+
public boolean isChainTable() {
3170+
return options.get(CHAIN_TABLE_ENABLED);
3171+
}
3172+
3173+
public String scanFallbackSnapshotBranch() {
3174+
return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
3175+
}
3176+
3177+
public String scanFallbackDeltaBranch() {
3178+
return options.get(SCAN_FALLBACK_DELTA_BRANCH);
3179+
}
3180+
3181+
public ChainBranchReadMode getChainBranchReadMode() {
3182+
return options.get(CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE);
3183+
}
3184+
31403185
public boolean formatTableImplementationIsPaimon() {
31413186
return options.get(FORMAT_TABLE_IMPLEMENTATION) == FormatTableImplementation.PAIMON;
31423187
}
@@ -3967,4 +4012,26 @@ public InlineElement getDescription() {
39674012
return text(description);
39684013
}
39694014
}
4015+
4016+
/** The read mode of chain branch. */
4017+
public enum ChainBranchReadMode {
4018+
DEFAULT("default", "Read as common table."),
4019+
CHAIN_READ("chain_read", "Read as chain table.");
4020+
4021+
private final String value;
4022+
private final String description;
4023+
4024+
ChainBranchReadMode(String value, String description) {
4025+
this.value = value;
4026+
this.description = description;
4027+
}
4028+
4029+
public String getValue() {
4030+
return value;
4031+
}
4032+
4033+
public InlineElement getDescription() {
4034+
return text(description);
4035+
}
4036+
}
39704037
}

paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ public Path getNextExternalDataPath(String fileName) {
4848
}
4949
return new Path(new Path(externalTablePaths.get(position), relativeBucketPath), fileName);
5050
}
51+
52+
public ExternalPathProvider withBucketPath(Path relativeBucketPath) {
53+
return new ExternalPathProvider(externalTablePaths, relativeBucketPath);
54+
}
5155
}

paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.types.RowType;
2929
import org.apache.paimon.types.VarCharType;
3030

31+
import java.util.ArrayList;
3132
import java.util.LinkedHashMap;
3233
import java.util.List;
3334
import java.util.Map;
@@ -142,4 +143,13 @@ public static String partToSimpleString(
142143
String result = builder.toString();
143144
return result.substring(0, Math.min(result.length(), maxLength));
144145
}
146+
147+
public List<String> generateOrderPartValues(InternalRow in) {
148+
LinkedHashMap<String, String> partSpec = generatePartValues(in);
149+
List<String> partValues = new ArrayList<>();
150+
for (String columnName : partitionColumns) {
151+
partValues.add(partSpec.get(columnName));
152+
}
153+
return partValues;
154+
}
145155
}

paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.paimon.utils;
2020

2121
import java.util.Collection;
22+
import java.util.HashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.concurrent.ThreadLocalRandom;
2426

2527
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -36,4 +38,19 @@ public static <T> T pickRandomly(List<T> list) {
3638
public static <T> boolean isNullOrEmpty(Collection<T> list) {
3739
return list == null || list.isEmpty();
3840
}
41+
42+
public static Map<String, String> convertListsToMap(List<String> keys, List<String> values) {
43+
if (keys.size() != values.size()) {
44+
throw new IllegalArgumentException(
45+
String.format(
46+
"keys and values size must be equal,"
47+
+ " but got keys %s and values %s",
48+
keys, values));
49+
}
50+
Map<String, String> result = new HashMap<>();
51+
for (int i = 0; i < keys.size(); i++) {
52+
result.put(keys.get(i), values.get(i));
53+
}
54+
return result;
55+
}
3956
}

paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
package org.apache.paimon.utils;
2020

21+
import org.apache.paimon.data.BinaryRow;
2122
import org.apache.paimon.data.GenericRow;
2223
import org.apache.paimon.data.InternalRow;
24+
import org.apache.paimon.predicate.Predicate;
25+
import org.apache.paimon.predicate.PredicateBuilder;
2326
import org.apache.paimon.types.RowType;
2427

2528
import java.io.Serializable;
29+
import java.util.ArrayList;
30+
import java.util.List;
2631
import java.util.stream.IntStream;
2732

2833
/** Convert {@link InternalRow} to object array. */
@@ -63,4 +68,36 @@ public Object[] convert(InternalRow rowData) {
6368
}
6469
return result;
6570
}
71+
72+
public Predicate createEqualPredicate(BinaryRow binaryRow) {
73+
PredicateBuilder builder = new PredicateBuilder(rowType);
74+
List<Predicate> fieldPredicates = new ArrayList<>();
75+
Object[] partitionObjects = convert(binaryRow);
76+
for (int i = 0; i < getArity(); i++) {
77+
Object o = partitionObjects[i];
78+
fieldPredicates.add(builder.equal(i, o));
79+
}
80+
return PredicateBuilder.and(fieldPredicates);
81+
}
82+
83+
public Predicate createLessThanPredicate(BinaryRow binaryRow, boolean includeEqual) {
84+
PredicateBuilder builder = new PredicateBuilder(rowType);
85+
List<Predicate> fieldPredicates = new ArrayList<>();
86+
Object[] partitionObjects = convert(binaryRow);
87+
for (int i = 0; i < getArity(); i++) {
88+
List<Predicate> andConditions = new ArrayList<>();
89+
for (int j = 0; j < i; j++) {
90+
Object o = partitionObjects[j];
91+
andConditions.add(builder.equal(j, o));
92+
}
93+
Object currentValue = partitionObjects[i];
94+
if (includeEqual) {
95+
andConditions.add(builder.lessOrEqual(i, currentValue));
96+
} else {
97+
andConditions.add(builder.lessThan(i, currentValue));
98+
}
99+
fieldPredicates.add(PredicateBuilder.and(andConditions));
100+
}
101+
return PredicateBuilder.or(fieldPredicates);
102+
}
66103
}

0 commit comments

Comments
 (0)