Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions docs/content/primary-key-table/chain-table.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
---
title: "Chain Table"
weight: 6
type: docs
aliases:
- /primary-key-table/chain-table.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Chain Table

Chain table is a new feature for Primary Key tables that revolutionizes how you process incremental data.
Consider such a type of scenario: store a full set of data periodically, such as every day, but most of it
is redundant, with only a small amount of changed data every day. ODS Binlog Dump is just such a typical
scene.

Taking a daily binlog dump job as an example, the previous day's full data and the new incremental data
are merged to generate a new full dataset by a daily task. There are two obvious drawbacks in this way:
* Full computation: Merge operation includes all data, and it will involve shuffle, which results in poor performance.
* Full storage: Store a full set of data every day, and the changed data usually accounts for a very small proportion (e.g., 1%).

Paimon can solve this problem by consuming the changed data directly and merge on read, in this way, the
aforementioned full computation and storage can be optimized into incremental mode.
* Incremental computation: The offline ETL daily job only needs to consume the changed data of the current day and do not require merging all data.
* Incremental Storage: Only store the changed data each day, and asynchronously compact it periodically (e.g., weekly) to build a global chain table within the lifecycle.
{{< img src="/img/chain-table.png">}}

Based on the regular paimon table, snapshot and delta branches are introduced to describe the full and incremental data
respectively. When writing, you can specify branch to write full or incremental data, and when reading, the
appropriate reading strategy is automatically selected based on the reading mode, such as full, incremental,
or hybrid reading.

To enable chain table, you must config `chain-table.enabled` to true in the table options when creating the
table, and the snapshot and delta branch need to be created as well. Consider an example via Spark SQL:

```sql
CREATE TABLE default.t (
`t1` string ,
`t2` string ,
`t3` string
) PARTITIONED BY (`date` string)
TBLPROPERTIES (
'chain-table.enabled' = 'true'
);

CALL sys.create_branch('default.t', 'snapshot');

CALL sys.create_branch('default.t', 'delta');

ALTER TABLE default.t SET tblproperties
('scan.fallback-snapshot-branch' = 'snapshot',
'scan.fallback-delta-branch' = 'delta');

ALTER TABLE `default`.`t$branch_snapshot` SET tblproperties
('scan.fallback-snapshot-branch' = 'snapshot',
'scan.fallback-delta-branch' = 'delta');

ALTER TABLE `default`.`t$branch_delta` SET tblproperties
('scan.fallback-snapshot-branch' = 'snapshot',
'scan.fallback-delta-branch' = 'delta');
```

Notice that:
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
- Chain table should ensure that the schema of each branch is consistent.
- Only spark support now, flink will be supported later.
- Chain compact is not supported for now, and it will be supported later.

After creating a chain table, you can read and write data in the following ways.

- Full Write: Write data to t$branch_snapshot.
```sql
insert overwrite `default`.`t$branch_snapshot` partition (date = '20250810')
values ('1', '1', '1');
```

- Incremental Write: Write data to t$branch_delta.
```sql
insert overwrite `default`.`t$branch_delta` partition (date = '20250811')
values ('2', '1', '1');
```

- Full Query: If the snapshot branch has full partition, read it directly; otherwise, read on chain merge mode.
```sql
select t1, t2, t3 from default.t where date = '20250811'
```
you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
```

- Incremental Query: Read the incremental partition from t$branch_delta
```sql
select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
```
you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 2 | 1| 1 |
+---+----+-----+
```

- Hybrid Query: Read both full and incremental data simultaneously.
```sql
select t1, t2, t3 from default.t where date = '20250811'
union all
select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
```
you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
```
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
<tr>
<td><h5>chain-table.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Specify chain table enable.</td>
</tr>
<tr>
<td><h5>changelog-file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -1007,6 +1013,18 @@
<td>String</td>
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
</tr>
<tr>
<td><h5>scan.fallback-delta-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, the reader will try to get this partition from chain snapshot and delta branch together.</td>
</tr>
<tr>
<td><h5>scan.fallback-snapshot-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
</tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Binary file added docs/static/img/chain-table.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
67 changes: 67 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,35 @@ public InlineElement getDescription() {
public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
key("chain-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Specify chain table enable.");

public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
key("scan.fallback-snapshot-branch")
.stringType()
.noDefaultValue()
.withDescription(
"When a batch job queries from a chain table, if a partition does not exist in the main branch, "
+ "the reader will try to get this partition from chain snapshot branch.");

public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
key("scan.fallback-delta-branch")
.stringType()
.noDefaultValue()
.withDescription(
"When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, "
+ "the reader will try to get this partition from chain snapshot and delta branch together.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<ChainBranchReadMode> CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE =
key("chain-table.branch.internal.read.mode")
.enumType(ChainBranchReadMode.class)
.defaultValue(ChainBranchReadMode.DEFAULT)
.withDescription("Chain query type.");

public static final String FILE_FORMAT_ORC = "orc";
public static final String FILE_FORMAT_AVRO = "avro";
public static final String FILE_FORMAT_PARQUET = "parquet";
Expand Down Expand Up @@ -3137,6 +3166,22 @@ public int lookupMergeRecordsThreshold() {
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
}

public boolean isChainTable() {
return options.get(CHAIN_TABLE_ENABLED);
}

public String scanFallbackSnapshotBranch() {
return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
}

public String scanFallbackDeltaBranch() {
return options.get(SCAN_FALLBACK_DELTA_BRANCH);
}

public ChainBranchReadMode getChainBranchReadMode() {
return options.get(CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE);
}

public boolean formatTableImplementationIsPaimon() {
return options.get(FORMAT_TABLE_IMPLEMENTATION) == FormatTableImplementation.PAIMON;
}
Expand Down Expand Up @@ -3967,4 +4012,26 @@ public InlineElement getDescription() {
return text(description);
}
}

/** The read mode of chain branch. */
public enum ChainBranchReadMode {
DEFAULT("default", "Read as common table."),
CHAIN_READ("chain_read", "Read as chain table.");

private final String value;
private final String description;

ChainBranchReadMode(String value, String description) {
this.value = value;
this.description = description;
}

public String getValue() {
return value;
}

public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ public Path getNextExternalDataPath(String fileName) {
}
return new Path(new Path(externalTablePaths.get(position), relativeBucketPath), fileName);
}

public ExternalPathProvider withBucketPath(Path relativeBucketPath) {
return new ExternalPathProvider(externalTablePaths, relativeBucketPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -142,4 +143,13 @@ public static String partToSimpleString(
String result = builder.toString();
return result.substring(0, Math.min(result.length(), maxLength));
}

public List<String> generateOrderPartValues(InternalRow in) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this? What is difference? You can add a test for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hoped to obtain the partition values in the order of the partition fields, and then calculate the dependent partitions according to the unit step size. For example, taking the hourly level as an example, the unit step size of the partition is hours. If starting from 0:00 on 20250811 and ending at 0:00 on 20250812, 24 incremental partitions in hours can be obtained, for the specific usage, please refer to org.apache.paimon.partition.InternalRowPartitionUtils#getDeltaPartitions. Unit tests have been added, please refer to org.apache.paimon.utils.InternalRowPartitionComputerTest#testGenerateOrderPartValues

LinkedHashMap<String, String> partSpec = generatePartValues(in);
List<String> partValues = new ArrayList<>();
for (String columnName : partitionColumns) {
partValues.add(partSpec.get(columnName));
}
return partValues;
}
}
17 changes: 17 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.utils;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -36,4 +38,19 @@ public static <T> T pickRandomly(List<T> list) {
public static <T> boolean isNullOrEmpty(Collection<T> list) {
return list == null || list.isEmpty();
}

public static Map<String, String> convertListsToMap(List<String> keys, List<String> values) {
if (keys.size() != values.size()) {
throw new IllegalArgumentException(
String.format(
"keys and values size must be equal,"
+ " but got keys %s and values %s",
keys, values));
}
Map<String, String> result = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), values.get(i));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;

/** Convert {@link InternalRow} to object array. */
Expand Down Expand Up @@ -63,4 +68,36 @@ public Object[] convert(InternalRow rowData) {
}
return result;
}

public Predicate createEqualPredicate(BinaryRow binaryRow) {
PredicateBuilder builder = new PredicateBuilder(rowType);
List<Predicate> fieldPredicates = new ArrayList<>();
Object[] partitionObjects = convert(binaryRow);
for (int i = 0; i < getArity(); i++) {
Object o = partitionObjects[i];
fieldPredicates.add(builder.equal(i, o));
}
return PredicateBuilder.and(fieldPredicates);
}

public Predicate createLessThanPredicate(BinaryRow binaryRow, boolean includeEqual) {
PredicateBuilder builder = new PredicateBuilder(rowType);
List<Predicate> fieldPredicates = new ArrayList<>();
Object[] partitionObjects = convert(binaryRow);
for (int i = 0; i < getArity(); i++) {
List<Predicate> andConditions = new ArrayList<>();
for (int j = 0; j < i; j++) {
Object o = partitionObjects[j];
andConditions.add(builder.equal(j, o));
}
Object currentValue = partitionObjects[i];
if (includeEqual) {
andConditions.add(builder.lessOrEqual(i, currentValue));
} else {
andConditions.add(builder.lessThan(i, currentValue));
}
fieldPredicates.add(PredicateBuilder.and(andConditions));
}
return PredicateBuilder.or(fieldPredicates);
}
}
Loading