Skip to content

Commit 49c13e0

Browse files
yangmu.0722Stefanietry
andcommitted
[core] Support chain tbl on batch mode
Co-authored-by: zhoufa <[email protected]>
1 parent e865e19 commit 49c13e0

File tree

47 files changed

+2292
-66
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2292
-66
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@
9898
<td>MemorySize</td>
9999
<td>Memory page size for caching.</td>
100100
</tr>
101+
<tr>
102+
<td><h5>chain-table.enabled</h5></td>
103+
<td style="word-wrap: break-word;">false</td>
104+
<td>Boolean</td>
105+
<td>Specify chain table enable.</td>
106+
</tr>
101107
<tr>
102108
<td><h5>changelog-file.compression</h5></td>
103109
<td style="word-wrap: break-word;">(none)</td>
@@ -1001,6 +1007,18 @@
10011007
<td>String</td>
10021008
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
10031009
</tr>
1010+
<tr>
1011+
<td><h5>scan.fallback-delta-branch</h5></td>
1012+
<td style="word-wrap: break-word;">(none)</td>
1013+
<td>String</td>
1014+
<td>When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, the reader will try to get this partition from chain snapshot and delta branch together.</td>
1015+
</tr>
1016+
<tr>
1017+
<td><h5>scan.fallback-snapshot-branch</h5></td>
1018+
<td style="word-wrap: break-word;">(none)</td>
1019+
<td>String</td>
1020+
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
1021+
</tr>
10041022
<tr>
10051023
<td><h5>scan.file-creation-time-millis</h5></td>
10061024
<td style="word-wrap: break-word;">(none)</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,35 @@ public InlineElement getDescription() {
226226
public static final ConfigOption<String> BRANCH =
227227
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
228228

229+
public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
230+
key("chain-table.enabled")
231+
.booleanType()
232+
.defaultValue(false)
233+
.withDescription("Specify chain table enable.");
234+
235+
public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
236+
key("scan.fallback-snapshot-branch")
237+
.stringType()
238+
.noDefaultValue()
239+
.withDescription(
240+
"When a batch job queries from a chain table, if a partition does not exist in the main branch, "
241+
+ "the reader will try to get this partition from chain snapshot branch.");
242+
243+
public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
244+
key("scan.fallback-delta-branch")
245+
.stringType()
246+
.noDefaultValue()
247+
.withDescription(
248+
"When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, "
249+
+ "the reader will try to get this partition from chain snapshot and delta branch together.");
250+
251+
@ExcludeFromDocumentation("Internal use only")
252+
public static final ConfigOption<ChainBranchReadMode> CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE =
253+
key("chain-table.branch.internal.read.mode")
254+
.enumType(ChainBranchReadMode.class)
255+
.defaultValue(ChainBranchReadMode.DEFAULT)
256+
.withDescription("Chain query type.");
257+
229258
public static final String FILE_FORMAT_ORC = "orc";
230259
public static final String FILE_FORMAT_AVRO = "avro";
231260
public static final String FILE_FORMAT_PARQUET = "parquet";
@@ -3131,6 +3160,22 @@ public int lookupMergeRecordsThreshold() {
31313160
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
31323161
}
31333162

3163+
public boolean isChainTable() {
3164+
return options.get(CHAIN_TABLE_ENABLED);
3165+
}
3166+
3167+
public String scanFallbackSnapshotBranch() {
3168+
return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
3169+
}
3170+
3171+
public String scanFallbackDeltaBranch() {
3172+
return options.get(SCAN_FALLBACK_DELTA_BRANCH);
3173+
}
3174+
3175+
public ChainBranchReadMode getChainBranchReadMode() {
3176+
return options.get(CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE);
3177+
}
3178+
31343179
public boolean formatTableImplementationIsPaimon() {
31353180
return options.get(FORMAT_TABLE_IMPLEMENTATION) == FormatTableImplementation.PAIMON;
31363181
}
@@ -3957,4 +4002,26 @@ public InlineElement getDescription() {
39574002
return text(description);
39584003
}
39594004
}
4005+
4006+
/** The read mode of chain branch. */
4007+
public enum ChainBranchReadMode {
4008+
DEFAULT("default", "Read as common table."),
4009+
CHAIN_READ("chain_read", "Read as chain table.");
4010+
4011+
private final String value;
4012+
private final String description;
4013+
4014+
ChainBranchReadMode(String value, String description) {
4015+
this.value = value;
4016+
this.description = description;
4017+
}
4018+
4019+
public String getValue() {
4020+
return value;
4021+
}
4022+
4023+
public InlineElement getDescription() {
4024+
return text(description);
4025+
}
4026+
}
39604027
}

paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ public Path getNextExternalDataPath(String fileName) {
4848
}
4949
return new Path(new Path(externalTablePaths.get(position), relativeBucketPath), fileName);
5050
}
51+
52+
public ExternalPathProvider withBucketPath(Path relativeBucketPath) {
53+
return new ExternalPathProvider(externalTablePaths, relativeBucketPath);
54+
}
5155
}

paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.types.RowType;
2929
import org.apache.paimon.types.VarCharType;
3030

31+
import java.util.ArrayList;
3132
import java.util.LinkedHashMap;
3233
import java.util.List;
3334
import java.util.Map;
@@ -142,4 +143,13 @@ public static String partToSimpleString(
142143
String result = builder.toString();
143144
return result.substring(0, Math.min(result.length(), maxLength));
144145
}
146+
147+
public List<String> generateOrderPartValues(InternalRow in) {
148+
LinkedHashMap<String, String> partSpec = generatePartValues(in);
149+
List<String> partValues = new ArrayList<>();
150+
for (String columnName : partitionColumns) {
151+
partValues.add(partSpec.get(columnName));
152+
}
153+
return partValues;
154+
}
145155
}

paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.paimon.utils;
2020

2121
import java.util.Collection;
22+
import java.util.HashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.concurrent.ThreadLocalRandom;
2426

2527
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -36,4 +38,19 @@ public static <T> T pickRandomly(List<T> list) {
3638
public static <T> boolean isNullOrEmpty(Collection<T> list) {
3739
return list == null || list.isEmpty();
3840
}
41+
42+
public static Map<String, String> convertListsToMap(List<String> keys, List<String> values) {
43+
if (keys.size() != values.size()) {
44+
throw new IllegalArgumentException(
45+
String.format(
46+
"keys and values size must be equal,"
47+
+ " but got keys %s and values %s",
48+
keys, values));
49+
}
50+
Map<String, String> result = new HashMap<>();
51+
for (int i = 0; i < keys.size(); i++) {
52+
result.put(keys.get(i), values.get(i));
53+
}
54+
return result;
55+
}
3956
}

paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
package org.apache.paimon.utils;
2020

21+
import org.apache.paimon.data.BinaryRow;
2122
import org.apache.paimon.data.GenericRow;
2223
import org.apache.paimon.data.InternalRow;
24+
import org.apache.paimon.predicate.Predicate;
25+
import org.apache.paimon.predicate.PredicateBuilder;
2326
import org.apache.paimon.types.RowType;
2427

2528
import java.io.Serializable;
29+
import java.util.ArrayList;
30+
import java.util.List;
2631
import java.util.stream.IntStream;
2732

2833
/** Convert {@link InternalRow} to object array. */
@@ -63,4 +68,36 @@ public Object[] convert(InternalRow rowData) {
6368
}
6469
return result;
6570
}
71+
72+
public Predicate createEqualPredicate(BinaryRow binaryRow) {
73+
PredicateBuilder builder = new PredicateBuilder(rowType);
74+
List<Predicate> fieldPredicates = new ArrayList<>();
75+
Object[] partitionObjects = convert(binaryRow);
76+
for (int i = 0; i < getArity(); i++) {
77+
Object o = partitionObjects[i];
78+
fieldPredicates.add(builder.equal(i, o));
79+
}
80+
return PredicateBuilder.and(fieldPredicates);
81+
}
82+
83+
public Predicate createLessThanPredicate(BinaryRow binaryRow, boolean includeEqual) {
84+
PredicateBuilder builder = new PredicateBuilder(rowType);
85+
List<Predicate> fieldPredicates = new ArrayList<>();
86+
Object[] partitionObjects = convert(binaryRow);
87+
for (int i = 0; i < getArity(); i++) {
88+
List<Predicate> andConditions = new ArrayList<>();
89+
for (int j = 0; j < i; j++) {
90+
Object o = partitionObjects[j];
91+
andConditions.add(builder.equal(j, o));
92+
}
93+
Object currentValue = partitionObjects[i];
94+
if (includeEqual) {
95+
andConditions.add(builder.lessOrEqual(i, currentValue));
96+
} else {
97+
andConditions.add(builder.lessThan(i, currentValue));
98+
}
99+
fieldPredicates.add(PredicateBuilder.and(andConditions));
100+
}
101+
return PredicateBuilder.or(fieldPredicates);
102+
}
66103
}

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.schema.TableSchema;
4040
import org.apache.paimon.table.BucketMode;
4141
import org.apache.paimon.table.CatalogEnvironment;
42+
import org.apache.paimon.table.source.ScanFactory;
4243
import org.apache.paimon.types.RowType;
4344
import org.apache.paimon.utils.KeyComparatorSupplier;
4445
import org.apache.paimon.utils.UserDefinedSeqComparator;
@@ -111,7 +112,7 @@ public BucketMode bucketMode() {
111112

112113
@Override
113114
public MergeFileSplitRead newRead() {
114-
return new MergeFileSplitRead(
115+
return ScanFactory.createMergeFileSplitRead(
115116
options,
116117
schema,
117118
keyType,
@@ -122,14 +123,14 @@ public MergeFileSplitRead newRead() {
122123
}
123124

124125
public RawFileSplitRead newBatchRawFileRead() {
125-
return new RawFileSplitRead(
126+
return ScanFactory.createBatchRawFileRead(
126127
fileIO,
127128
schemaManager,
128129
schema,
129130
valueType,
130131
FileFormatDiscover.of(options),
131132
pathFactory(),
132-
options.fileIndexReadEnabled(),
133+
options,
133134
false);
134135
}
135136

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.schema.Schema;
3434
import org.apache.paimon.schema.SchemaChange;
3535
import org.apache.paimon.schema.SchemaManager;
36+
import org.apache.paimon.schema.SchemaValidation;
3637
import org.apache.paimon.schema.TableSchema;
3738
import org.apache.paimon.table.FileStoreTable;
3839
import org.apache.paimon.table.FormatTable;
@@ -383,7 +384,10 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
383384
checkNotSystemTable(identifier, "createTable");
384385
validateCreateTable(schema);
385386
validateCustomTablePath(schema.options());
386-
387+
SchemaValidation.validateChainTableOptions(
388+
schema.options(),
389+
schema.primaryKeys() == null ? null : String.join(",", schema.primaryKeys()),
390+
!schema.partitionKeys().isEmpty());
387391
// check db exists
388392
getDatabase(identifier.getDatabaseName());
389393

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.fs.ExternalPathProvider;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.manifest.FileEntry;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.Map;
28+
29+
/** Factory which produces read {@link Path}s data files for chain tbl. */
30+
public class ChainReadDataFilePathFactory extends DataFilePathFactory {
31+
32+
private final Map<String, String> fileBucketPathMapping;
33+
34+
public ChainReadDataFilePathFactory(
35+
Path parent,
36+
String formatIdentifier,
37+
String dataFilePrefix,
38+
String changelogFilePrefix,
39+
boolean fileSuffixIncludeCompression,
40+
String fileCompression,
41+
@Nullable ExternalPathProvider externalPathProvider,
42+
Map<String, String> fileBucketPathMapping) {
43+
super(
44+
parent,
45+
formatIdentifier,
46+
dataFilePrefix,
47+
changelogFilePrefix,
48+
fileSuffixIncludeCompression,
49+
fileCompression,
50+
externalPathProvider);
51+
this.fileBucketPathMapping = fileBucketPathMapping;
52+
}
53+
54+
@Override
55+
public Path newPath(String prefix) {
56+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
57+
}
58+
59+
@Override
60+
public Path toPath(DataFileMeta file) {
61+
return file.externalPath()
62+
.map(Path::new)
63+
.orElse(new Path(fileBucketPathMapping.get(file.fileName()), file.fileName()));
64+
}
65+
66+
@Override
67+
public Path toPath(FileEntry file) {
68+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
69+
}
70+
71+
@Override
72+
public Path toAlignedPath(String fileName, DataFileMeta aligned) {
73+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
74+
}
75+
76+
@Override
77+
public Path toAlignedPath(String fileName, FileEntry aligned) {
78+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
79+
}
80+
}

0 commit comments

Comments
 (0)