Skip to content

Commit 2f68c6e

Browse files
committed
Paimon initial support
1 parent 19cbda4 commit 2f68c6e

File tree

14 files changed

+1016
-59
lines changed

14 files changed

+1016
-59
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hs_err_pid*
2626
# Ignore java-version and idea files.
2727
.java-version
2828
.idea
29+
.vscode
2930

3031
# Ignore Gradle project-specific cache directory
3132
.gradle

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
<spark.version.prefix>3.4</spark.version.prefix>
8888
<iceberg.version>1.4.2</iceberg.version>
8989
<delta.version>2.4.0</delta.version>
90+
<paimon.version>1.2.0</paimon.version>
9091
<jackson.version>2.18.2</jackson.version>
9192
<spotless.version>2.43.0</spotless.version>
9293
<apache.rat.version>0.16.1</apache.rat.version>
@@ -332,6 +333,13 @@
332333
<version>${delta.hive.version}</version>
333334
</dependency>
334335

336+
<!-- Paimon -->
337+
<dependency>
338+
<groupId>org.apache.paimon</groupId>
339+
<artifactId>paimon-bundle</artifactId>
340+
<version>${paimon.version}</version>
341+
</dependency>
342+
335343
<!-- Spark -->
336344
<dependency>
337345
<groupId>org.apache.spark</groupId>

xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ public class TableFormat {
2727
public static final String HUDI = "HUDI";
2828
public static final String ICEBERG = "ICEBERG";
2929
public static final String DELTA = "DELTA";
30+
public static final String PAIMON = "PAIMON";
3031

3132
public static String[] values() {
32-
return new String[] {"HUDI", "ICEBERG", "DELTA"};
33+
return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"};
3334
}
3435
}

xtable-core/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@
110110
<scope>test</scope>
111111
</dependency>
112112

113+
<!-- Paimon dependencies -->
114+
<dependency>
115+
<groupId>org.apache.paimon</groupId>
116+
<artifactId>paimon-bundle</artifactId>
117+
</dependency>
118+
<dependency>
119+
<groupId>org.apache.paimon</groupId>
120+
<artifactId>paimon-spark-${spark.version.prefix}</artifactId>
121+
<version>${paimon.version}</version>
122+
<scope>test</scope>
123+
</dependency>
124+
113125
<!-- Hadoop dependencies -->
114126
<dependency>
115127
<groupId>org.apache.hadoop</groupId>
@@ -206,6 +218,14 @@
206218
<skip>false</skip>
207219
</configuration>
208220
</plugin>
221+
<plugin>
222+
<groupId>org.apache.maven.plugins</groupId>
223+
<artifactId>maven-compiler-plugin</artifactId>
224+
<configuration>
225+
<source>11</source>
226+
<target>11</target>
227+
</configuration>
228+
</plugin>
209229
</plugins>
210230
</build>
211231
</project>
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.paimon;
20+
21+
import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION;
22+
23+
import java.io.IOException;
24+
import java.time.Instant;
25+
import java.util.List;
26+
27+
import lombok.extern.log4j.Log4j2;
28+
29+
import org.apache.paimon.Snapshot;
30+
import org.apache.paimon.schema.SchemaManager;
31+
import org.apache.paimon.schema.TableSchema;
32+
import org.apache.paimon.table.FileStoreTable;
33+
import org.apache.paimon.utils.SnapshotManager;
34+
35+
import org.apache.xtable.exception.ReadException;
36+
import org.apache.xtable.model.*;
37+
import org.apache.xtable.model.schema.InternalPartitionField;
38+
import org.apache.xtable.model.schema.InternalSchema;
39+
import org.apache.xtable.model.storage.DataLayoutStrategy;
40+
import org.apache.xtable.model.storage.InternalDataFile;
41+
import org.apache.xtable.model.storage.PartitionFileGroup;
42+
import org.apache.xtable.model.storage.TableFormat;
43+
import org.apache.xtable.spi.extractor.ConversionSource;
44+
45+
@Log4j2
46+
public class PaimonConversionSource implements ConversionSource<Snapshot> {
47+
48+
private final FileStoreTable paimonTable;
49+
private final SchemaManager schemaManager;
50+
private final SnapshotManager snapshotManager;
51+
52+
private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance();
53+
private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance();
54+
private final PaimonPartitionExtractor partitionSpecExtractor =
55+
PaimonPartitionExtractor.getInstance();
56+
57+
public PaimonConversionSource(FileStoreTable paimonTable) {
58+
this.paimonTable = paimonTable;
59+
this.schemaManager = paimonTable.schemaManager();
60+
this.snapshotManager = paimonTable.snapshotManager();
61+
}
62+
63+
@Override
64+
public InternalTable getTable(Snapshot snapshot) {
65+
TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
66+
InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema);
67+
68+
List<String> partitionKeys = paimonTable.partitionKeys();
69+
List<InternalPartitionField> partitioningFields =
70+
partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema);
71+
72+
DataLayoutStrategy dataLayoutStrategy =
73+
partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION;
74+
75+
return InternalTable.builder()
76+
.name(paimonTable.name())
77+
.tableFormat(TableFormat.PAIMON)
78+
.readSchema(internalSchema)
79+
.layoutStrategy(dataLayoutStrategy)
80+
.basePath(paimonTable.location().toString())
81+
.partitioningFields(partitioningFields)
82+
.latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
83+
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
84+
.build();
85+
}
86+
87+
@Override
88+
public InternalTable getCurrentTable() {
89+
SnapshotManager snapshotManager = paimonTable.snapshotManager();
90+
Snapshot snapshot = snapshotManager.latestSnapshot();
91+
if (snapshot == null) {
92+
throw new ReadException("No snapshots found for table " + paimonTable.name());
93+
}
94+
return getTable(snapshot);
95+
}
96+
97+
@Override
98+
public InternalSnapshot getCurrentSnapshot() {
99+
SnapshotManager snapshotManager = paimonTable.snapshotManager();
100+
Snapshot snapshot = snapshotManager.latestSnapshot();
101+
if (snapshot == null) {
102+
throw new ReadException("No snapshots found for table " + paimonTable.name());
103+
}
104+
105+
InternalTable internalTable = getTable(snapshot);
106+
List<InternalDataFile> internalDataFiles =
107+
dataFileExtractor.toInternalDataFiles(paimonTable, snapshot);
108+
109+
return InternalSnapshot.builder()
110+
.table(internalTable)
111+
.version(Long.toString(snapshot.timeMillis()))
112+
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
113+
// TODO : Implement pending commits extraction, required for incremental sync
114+
.sourceIdentifier(getCommitIdentifier(snapshot))
115+
.build();
116+
}
117+
118+
@Override
119+
public TableChange getTableChangeForCommit(Snapshot snapshot) {
120+
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
121+
}
122+
123+
@Override
124+
public CommitsBacklog<Snapshot> getCommitsBacklog(
125+
InstantsForIncrementalSync instantsForIncrementalSync) {
126+
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
127+
}
128+
129+
@Override
130+
public boolean isIncrementalSyncSafeFrom(Instant instant) {
131+
return false; // Incremental sync is not supported yet
132+
}
133+
134+
@Override
135+
public String getCommitIdentifier(Snapshot snapshot) {
136+
return Long.toString(snapshot.commitIdentifier());
137+
}
138+
139+
@Override
140+
public void close() throws IOException {}
141+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.paimon;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.paimon.Snapshot;
24+
import org.apache.paimon.catalog.CatalogContext;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
27+
import org.apache.paimon.options.Options;
28+
import org.apache.paimon.table.FileStoreTable;
29+
import org.apache.paimon.table.FileStoreTableFactory;
30+
31+
import org.apache.xtable.conversion.ConversionSourceProvider;
32+
import org.apache.xtable.conversion.SourceTable;
33+
import org.apache.xtable.spi.extractor.ConversionSource;
34+
35+
public class PaimonConversionSourceProvider extends ConversionSourceProvider<Snapshot> {
36+
@Override
37+
public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable sourceTableConfig) {
38+
try {
39+
Options catalogOptions = new Options();
40+
CatalogContext context = CatalogContext.create(catalogOptions, hadoopConf);
41+
42+
Path path = new Path(sourceTableConfig.getDataPath());
43+
FileIO fileIO = FileIO.get(path, context);
44+
FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);
45+
46+
return new PaimonConversionSource(paimonTable);
47+
} catch (IOException e) {
48+
throw new RuntimeException(e);
49+
}
50+
}
51+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.paimon;
20+
21+
import java.util.*;
22+
23+
import org.apache.paimon.Snapshot;
24+
import org.apache.paimon.io.DataFileMeta;
25+
import org.apache.paimon.manifest.ManifestEntry;
26+
import org.apache.paimon.table.FileStoreTable;
27+
import org.apache.paimon.table.source.snapshot.SnapshotReader;
28+
29+
import org.apache.xtable.model.stat.ColumnStat;
30+
import org.apache.xtable.model.storage.InternalDataFile;
31+
32+
public class PaimonDataFileExtractor {
33+
34+
private final PaimonPartitionExtractor partitionExtractor =
35+
PaimonPartitionExtractor.getInstance();
36+
37+
private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor();
38+
39+
public static PaimonDataFileExtractor getInstance() {
40+
return INSTANCE;
41+
}
42+
43+
public List<InternalDataFile> toInternalDataFiles(FileStoreTable table, Snapshot snapshot) {
44+
List<InternalDataFile> result = new ArrayList<>();
45+
Iterator<ManifestEntry> iterator = newSnapshotReader(table, snapshot).readFileIterator();
46+
while (iterator.hasNext()) {
47+
result.add(toInternalDataFile(table, iterator.next()));
48+
}
49+
return result;
50+
}
51+
52+
private InternalDataFile toInternalDataFile(FileStoreTable table, ManifestEntry entry) {
53+
return InternalDataFile.builder()
54+
.physicalPath(toFullPhysicalPath(table, entry))
55+
.fileSizeBytes(entry.file().fileSize())
56+
.lastModified(entry.file().creationTimeEpochMillis())
57+
.recordCount(entry.file().rowCount())
58+
.partitionValues(partitionExtractor.toPartitionValues(table, entry.partition()))
59+
.columnStats(toColumnStats(entry.file()))
60+
.build();
61+
}
62+
63+
private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) {
64+
String basePath = table.location().toString();
65+
String bucketPath = "bucket-" + entry.bucket();
66+
String filePath = entry.file().fileName();
67+
68+
Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, entry.partition());
69+
if (partitionPath.isPresent()) {
70+
return String.join("/", basePath, partitionPath.get(), bucketPath, filePath);
71+
} else {
72+
return String.join("/", basePath, bucketPath, filePath);
73+
}
74+
}
75+
76+
private List<ColumnStat> toColumnStats(DataFileMeta file) {
77+
// TODO: Implement logic to extract column stats from the file meta
78+
return Collections.emptyList();
79+
}
80+
81+
private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) {
82+
// If the table has primary keys, we read only the top level files
83+
// which means we can only consider fully compacted files.
84+
if (!table.schema().primaryKeys().isEmpty()) {
85+
return table
86+
.newSnapshotReader()
87+
.withLevel(table.coreOptions().numLevels() - 1)
88+
.withSnapshot(snapshot);
89+
} else {
90+
return table.newSnapshotReader().withSnapshot(snapshot);
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)