Skip to content

Commit e9c9096

Browse files
yangmu.0722Stefanietry
andcommitted
[core] Support chain tbl on batch mode
Co-authored-by: zhoufa <[email protected]>
1 parent e35962b commit e9c9096

File tree

46 files changed

+2291
-67
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2291
-67
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@
9292
<td>MemorySize</td>
9393
<td>Memory page size for caching.</td>
9494
</tr>
95+
<tr>
96+
<td><h5>chain-table.enabled</h5></td>
97+
<td style="word-wrap: break-word;">false</td>
98+
<td>Boolean</td>
99+
<td>Specify chain table enable.</td>
100+
</tr>
95101
<tr>
96102
<td><h5>changelog-file.compression</h5></td>
97103
<td style="word-wrap: break-word;">(none)</td>
@@ -977,6 +983,18 @@
977983
<td>String</td>
978984
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
979985
</tr>
986+
<tr>
987+
<td><h5>scan.fallback-delta-branch</h5></td>
988+
<td style="word-wrap: break-word;">(none)</td>
989+
<td>String</td>
990+
<td>When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, the reader will try to get this partition from chain snapshot branch and delta branch together.</td>
991+
</tr>
992+
<tr>
993+
<td><h5>scan.fallback-snapshot-branch</h5></td>
994+
<td style="word-wrap: break-word;">(none)</td>
995+
<td>String</td>
996+
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
997+
</tr>
980998
<tr>
981999
<td><h5>scan.file-creation-time-millis</h5></td>
9821000
<td style="word-wrap: break-word;">(none)</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,35 @@ public InlineElement getDescription() {
226226
public static final ConfigOption<String> BRANCH =
227227
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
228228

229+
public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
230+
key("chain-table.enabled")
231+
.booleanType()
232+
.defaultValue(false)
233+
.withDescription("Specify chain table enable.");
234+
235+
public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
236+
key("scan.fallback-snapshot-branch")
237+
.stringType()
238+
.noDefaultValue()
239+
.withDescription(
240+
"When a batch job queries from a chain table, if a partition does not exist in the main branch, "
241+
+ "the reader will try to get this partition from chain snapshot branch.");
242+
243+
public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
244+
key("scan.fallback-delta-branch")
245+
.stringType()
246+
.noDefaultValue()
247+
.withDescription(
248+
"When a batch job queries from a chain table, if a partition does not exist in the main and snapshot branch, "
249+
+ "the reader will try to get this partition from chain snapshot and delta branch together.");
250+
251+
@ExcludeFromDocumentation("Internal use only")
252+
public static final ConfigOption<ChainBranchReadMode> CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE =
253+
key("chain-table.branch.internal.read.mode")
254+
.enumType(ChainBranchReadMode.class)
255+
.defaultValue(ChainBranchReadMode.DEFAULT)
256+
.withDescription("Chain query type.");
257+
229258
public static final String FILE_FORMAT_ORC = "orc";
230259
public static final String FILE_FORMAT_AVRO = "avro";
231260
public static final String FILE_FORMAT_PARQUET = "parquet";
@@ -3081,6 +3110,22 @@ public int lookupMergeRecordsThreshold() {
30813110
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
30823111
}
30833112

3113+
public boolean isChainTable() {
3114+
return options.get(CHAIN_TABLE_ENABLED);
3115+
}
3116+
3117+
public String scanFallbackSnapshotBranch() {
3118+
return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
3119+
}
3120+
3121+
public String scanFallbackDeltaBranch() {
3122+
return options.get(SCAN_FALLBACK_DELTA_BRANCH);
3123+
}
3124+
3125+
public ChainBranchReadMode getChainBranchReadMode() {
3126+
return options.get(CHAIN_TABLE_BRANCH_INTERNAL_READ_MODE);
3127+
}
3128+
30843129
public boolean formatTableImplementationIsPaimon() {
30853130
return options.get(FORMAT_TABLE_IMPLEMENTATION) == FormatTableImplementation.PAIMON;
30863131
}
@@ -3907,4 +3952,26 @@ public InlineElement getDescription() {
39073952
return text(description);
39083953
}
39093954
}
3955+
3956+
/** The read mode of chain branch. */
3957+
public enum ChainBranchReadMode {
3958+
DEFAULT("default", "Read as common table."),
3959+
CHAIN_READ("chain_read", "Read as chain table.");
3960+
3961+
private final String value;
3962+
private final String description;
3963+
3964+
ChainBranchReadMode(String value, String description) {
3965+
this.value = value;
3966+
this.description = description;
3967+
}
3968+
3969+
public String getValue() {
3970+
return value;
3971+
}
3972+
3973+
public InlineElement getDescription() {
3974+
return text(description);
3975+
}
3976+
}
39103977
}

paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ public Path getNextExternalDataPath(String fileName) {
4848
}
4949
return new Path(new Path(externalTablePaths.get(position), relativeBucketPath), fileName);
5050
}
51+
52+
public ExternalPathProvider withBucketPath(Path relativeBucketPath) {
53+
return new ExternalPathProvider(externalTablePaths, relativeBucketPath);
54+
}
5155
}

paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.types.RowType;
2929
import org.apache.paimon.types.VarCharType;
3030

31+
import java.util.ArrayList;
3132
import java.util.LinkedHashMap;
3233
import java.util.List;
3334
import java.util.Map;
@@ -142,4 +143,13 @@ public static String partToSimpleString(
142143
String result = builder.toString();
143144
return result.substring(0, Math.min(result.length(), maxLength));
144145
}
146+
147+
public List<String> generateOrderPartValues(InternalRow in) {
148+
LinkedHashMap<String, String> partSpec = generatePartValues(in);
149+
List<String> partValues = new ArrayList<>();
150+
for (String columnName : partitionColumns) {
151+
partValues.add(partSpec.get(columnName));
152+
}
153+
return partValues;
154+
}
145155
}

paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.paimon.utils;
2020

2121
import java.util.Collection;
22+
import java.util.HashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.concurrent.ThreadLocalRandom;
2426

2527
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -36,4 +38,19 @@ public static <T> T pickRandomly(List<T> list) {
3638
public static <T> boolean isNullOrEmpty(Collection<T> list) {
3739
return list == null || list.isEmpty();
3840
}
41+
42+
public static Map<String, String> convertListsToMap(List<String> keys, List<String> values) {
43+
if (keys.size() != values.size()) {
44+
throw new IllegalArgumentException(
45+
String.format(
46+
"keys and values size must be equal,"
47+
+ " but got keys %s and values %s",
48+
keys, values));
49+
}
50+
Map<String, String> result = new HashMap<>();
51+
for (int i = 0; i < keys.size(); i++) {
52+
result.put(keys.get(i), values.get(i));
53+
}
54+
return result;
55+
}
3956
}

paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
package org.apache.paimon.utils;
2020

21+
import org.apache.paimon.data.BinaryRow;
2122
import org.apache.paimon.data.GenericRow;
2223
import org.apache.paimon.data.InternalRow;
24+
import org.apache.paimon.predicate.Predicate;
25+
import org.apache.paimon.predicate.PredicateBuilder;
2326
import org.apache.paimon.types.RowType;
2427

2528
import java.io.Serializable;
29+
import java.util.ArrayList;
30+
import java.util.List;
2631
import java.util.stream.IntStream;
2732

2833
/** Convert {@link InternalRow} to object array. */
@@ -63,4 +68,36 @@ public Object[] convert(InternalRow rowData) {
6368
}
6469
return result;
6570
}
71+
72+
public Predicate createEqualPredicate(BinaryRow binaryRow) {
73+
PredicateBuilder builder = new PredicateBuilder(rowType);
74+
List<Predicate> fieldPredicates = new ArrayList<>();
75+
Object[] partitionObjects = convert(binaryRow);
76+
for (int i = 0; i < getArity(); i++) {
77+
Object o = partitionObjects[i];
78+
fieldPredicates.add(builder.equal(i, o));
79+
}
80+
return PredicateBuilder.and(fieldPredicates);
81+
}
82+
83+
public Predicate createLessThanPredicate(BinaryRow binaryRow, boolean includeEqual) {
84+
PredicateBuilder builder = new PredicateBuilder(rowType);
85+
List<Predicate> fieldPredicates = new ArrayList<>();
86+
Object[] partitionObjects = convert(binaryRow);
87+
for (int i = 0; i < getArity(); i++) {
88+
List<Predicate> andConditions = new ArrayList<>();
89+
for (int j = 0; j < i; j++) {
90+
Object o = partitionObjects[j];
91+
andConditions.add(builder.equal(j, o));
92+
}
93+
Object currentValue = partitionObjects[i];
94+
if (includeEqual) {
95+
andConditions.add(builder.lessOrEqual(i, currentValue));
96+
} else {
97+
andConditions.add(builder.lessThan(i, currentValue));
98+
}
99+
fieldPredicates.add(PredicateBuilder.and(andConditions));
100+
}
101+
return PredicateBuilder.or(fieldPredicates);
102+
}
66103
}

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.schema.TableSchema;
4040
import org.apache.paimon.table.BucketMode;
4141
import org.apache.paimon.table.CatalogEnvironment;
42+
import org.apache.paimon.table.source.ScanFactory;
4243
import org.apache.paimon.types.RowType;
4344
import org.apache.paimon.utils.KeyComparatorSupplier;
4445
import org.apache.paimon.utils.UserDefinedSeqComparator;
@@ -111,7 +112,7 @@ public BucketMode bucketMode() {
111112

112113
@Override
113114
public MergeFileSplitRead newRead() {
114-
return new MergeFileSplitRead(
115+
return ScanFactory.createMergeFileSplitRead(
115116
options,
116117
schema,
117118
keyType,
@@ -122,14 +123,14 @@ public MergeFileSplitRead newRead() {
122123
}
123124

124125
public RawFileSplitRead newBatchRawFileRead() {
125-
return new RawFileSplitRead(
126+
return ScanFactory.createBatchRawFileRead(
126127
fileIO,
127128
schemaManager,
128129
schema,
129130
valueType,
130131
FileFormatDiscover.of(options),
131132
pathFactory(),
132-
options.fileIndexReadEnabled(),
133+
options,
133134
false);
134135
}
135136

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.schema.Schema;
3434
import org.apache.paimon.schema.SchemaChange;
3535
import org.apache.paimon.schema.SchemaManager;
36+
import org.apache.paimon.schema.SchemaValidation;
3637
import org.apache.paimon.schema.TableSchema;
3738
import org.apache.paimon.table.FileStoreTable;
3839
import org.apache.paimon.table.FormatTable;
@@ -383,7 +384,10 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
383384
checkNotSystemTable(identifier, "createTable");
384385
validateCreateTable(schema);
385386
validateCustomTablePath(schema.options());
386-
387+
SchemaValidation.validateChainTableOptions(
388+
schema.options(),
389+
schema.primaryKeys() == null ? null : String.join(",", schema.primaryKeys()),
390+
!schema.partitionKeys().isEmpty());
387391
// check db exists
388392
getDatabase(identifier.getDatabaseName());
389393

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.fs.ExternalPathProvider;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.manifest.FileEntry;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.Map;
28+
29+
/** Factory which produces read {@link Path}s data files for chain tbl. */
30+
public class ChainReadDataFilePathFactory extends DataFilePathFactory {
31+
32+
private final Map<String, String> fileBucketPathMapping;
33+
34+
public ChainReadDataFilePathFactory(
35+
Path parent,
36+
String formatIdentifier,
37+
String dataFilePrefix,
38+
String changelogFilePrefix,
39+
boolean fileSuffixIncludeCompression,
40+
String fileCompression,
41+
@Nullable ExternalPathProvider externalPathProvider,
42+
Map<String, String> fileBucketPathMapping) {
43+
super(
44+
parent,
45+
formatIdentifier,
46+
dataFilePrefix,
47+
changelogFilePrefix,
48+
fileSuffixIncludeCompression,
49+
fileCompression,
50+
externalPathProvider);
51+
this.fileBucketPathMapping = fileBucketPathMapping;
52+
}
53+
54+
@Override
55+
public Path newPath(String prefix) {
56+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
57+
}
58+
59+
@Override
60+
public Path toPath(DataFileMeta file) {
61+
return file.externalPath()
62+
.map(Path::new)
63+
.orElse(new Path(fileBucketPathMapping.get(file.fileName()), file.fileName()));
64+
}
65+
66+
@Override
67+
public Path toPath(FileEntry file) {
68+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
69+
}
70+
71+
@Override
72+
public Path toAlignedPath(String fileName, DataFileMeta aligned) {
73+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
74+
}
75+
76+
@Override
77+
public Path toAlignedPath(String fileName, FileEntry aligned) {
78+
throw new UnsupportedOperationException("Please use api with relativeBucketPath");
79+
}
80+
}

0 commit comments

Comments
 (0)