-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core] Support chain tbl on batch mode #6394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[core] Support chain tbl on batch mode #6394
Conversation
7efbc2f to
817cf2a
Compare
| /** Utils for table. */ | ||
| public class ChainTableUtils { | ||
|
|
||
| public static void checkChainTableOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this in SchemaValidation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaValidation
Done
817cf2a to
84ee5b3
Compare
| public static void validateChainTableOptions( | ||
| Map<String, String> tableOptions, @Nullable String primaryKeys, boolean partitionTbl) { | ||
| if (Boolean.parseBoolean(tableOptions.get(CoreOptions.CHAIN_TABLE_ENABLED.key()))) { | ||
| Options options = Options.fromMap(tableOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also check that the changelog-producer != ("lookup", "full-compaction")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| public static void validateChainTableOptions( | ||
| Map<String, String> tableOptions, @Nullable String primaryKeys, boolean partitionTbl) { | ||
| if (Boolean.parseBoolean(tableOptions.get(CoreOptions.CHAIN_TABLE_ENABLED.key()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to options.get(CoreOptions.CHAIN_TABLE_ENABLED)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| .withDescription("Specify chain table enable."); | ||
|
|
||
| public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH = | ||
| key("scan.fallback-snapshot-branch") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it doesn't have the default value, we have to check this when the user enable the chain table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config depends on the actual created branch and operates in sequence with the table creation, so it is designed to be aligned with scan.fallback-branch
| this.dvFactory = dvFactory; | ||
| } | ||
|
|
||
| public KeyValueFileReaderFactory(KeyValueFileReaderFactory factory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we introduce a copy method to replace this construct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for inheritance scenarios for the member variable initialization, and the member is declared as final
|
|
||
| @Nullable private final RecordLevelExpire recordLevelExpire; | ||
|
|
||
| private final boolean isChainTbl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to forceKeepDelete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Looks good to me +1. Please take another look cc @JingsongLi |
e9c9096 to
90ac914
Compare
b9000a9 to
49c13e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation.
| return result.substring(0, Math.min(result.length(), maxLength)); | ||
| } | ||
|
|
||
| public List<String> generateOrderPartValues(InternalRow in) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this? What is difference? You can add a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hoped to obtain the partition values in the order of the partition fields, and then calculate the dependent partitions according to the unit step size. For example, taking the hourly level as an example, the unit step size of the partition is hours. If starting from 0:00 on 20250811 and ending at 0:00 on 20250812, 24 incremental partitions in hours can be obtained, for the specific usage, please refer to org.apache.paimon.partition.InternalRowPartitionUtils#getDeltaPartitions. Unit tests have been added, please refer to org.apache.paimon.utils.InternalRowPartitionComputerTest#testGenerateOrderPartValues
| return newPathFromName(newFileName(prefix)); | ||
| } | ||
|
|
||
| public Path newPath(String prefix, Path relativeBucketPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For chain table, one logical partition may correspond to multiple physical partitions. Therefore, when creating a new path, on the basis of the original api about newPath, both partitions and buckets need to be specified simultaneously. The current interface is not in use for now. It is only for compatibility with the function of creating new path
49c13e0 to
91e9697
Compare
Co-authored-by: zhoufa <[email protected]>
91e9697 to
3aa1526
Compare
Purpose
Related to PIP: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table
Linked issue
#6313
Tests
see: org.apache.paimon.spark.SparkCatalogWithHiveTest#testChainTable
API and Format
Related to PIP: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table
Documentation
Related to PIP: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table