diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index e02098c0dd80..9c61a0d346d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -309,6 +309,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { options.commitMaxRetryWait(), options.commitStrictModeLastSafeSnapshot().orElse(null), options.rowTrackingEnabled(), + options.dataEvolutionEnabled(), options.commitDiscardDuplicateFiles(), conflictDetection); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index a078bd90acec..b732b51e1f62 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -149,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Nullable private Long strictModeLastSafeSnapshot; private final InternalRowPartitionComputer partitionComputer; private final boolean rowTrackingEnabled; + private final boolean dataEvolutionEnabled; private final boolean discardDuplicateFiles; private final ConflictDetection conflictDetection; @@ -186,6 +187,7 @@ public FileStoreCommitImpl( long commitMaxRetryWait, @Nullable Long strictModeLastSafeSnapshot, boolean rowTrackingEnabled, + boolean dataEvolutionEnabled, boolean discardDuplicateFiles, ConflictDetection conflictDetection) { this.snapshotCommit = snapshotCommit; @@ -230,6 +232,7 @@ public FileStoreCommitImpl( this.statsFileHandler = statsFileHandler; this.bucketMode = bucketMode; this.rowTrackingEnabled = rowTrackingEnabled; + this.dataEvolutionEnabled = dataEvolutionEnabled; this.discardDuplicateFiles = discardDuplicateFiles; this.conflictDetection = conflictDetection; } @@ -1120,8 +1123,16 @@ CommitResult tryCommitOnce( } // the added records subtract the deleted records from - long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); - long totalRecordCount = previousTotalRecordCount + deltaRecordCount; + long deltaRecordCount; + long totalRecordCount; + + if (dataEvolutionEnabled) { + deltaRecordCount = nextRowIdStart - firstRowIdStart; + totalRecordCount = nextRowIdStart; + } else { + deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); + totalRecordCount = previousTotalRecordCount + deltaRecordCount; + } // write new delta files into manifest files deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles)); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java index d28a2638c659..01cc09a64c8f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java @@ -59,13 +59,18 @@ public List splitForBatch(List input) { .map( f -> { boolean rawConvertible = f.stream().allMatch(file -> file.size() == 1); + long rowCount = + f.stream() + .mapToLong(entries -> entries.get(0).rowCount()) + .sum(); List groupFiles = f.stream() .flatMap(Collection::stream) .collect(Collectors.toList()); return rawConvertible - ? SplitGroup.rawConvertibleGroup(groupFiles) - : SplitGroup.nonRawConvertibleGroup(groupFiles); + ? SplitGroup.rawConvertibleGroup(groupFiles).rowCount(rowCount) + : SplitGroup.nonRawConvertibleGroup(groupFiles) + .rowCount(rowCount); }) .collect(Collectors.toList()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 3a4c112a95ee..5710f37a65c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -62,7 +62,7 @@ public class DataSplit implements Split { private static final long serialVersionUID = 7L; private static final long MAGIC = -2394839472490812314L; - private static final int VERSION = 8; + private static final int VERSION = 9; private long snapshotId = 0; private BinaryRow partition; @@ -78,6 +78,7 @@ public class DataSplit implements Split { private boolean isStreaming = false; private boolean rawConvertible; + private long rowCount = -1L; public DataSplit() {} @@ -136,6 +137,9 @@ public OptionalLong earliestFileCreationEpochMillis() { @Override public long rowCount() { + if (this.rowCount > 0L) { + return this.rowCount; + } long rowCount = 0; for (DataFileMeta file : dataFiles) { rowCount += file.rowCount(); @@ -309,7 +313,8 @@ public boolean equals(Object o) { && Objects.equals(beforeFiles, dataSplit.beforeFiles) && Objects.equals(beforeDeletionFiles, dataSplit.beforeDeletionFiles) && Objects.equals(dataFiles, dataSplit.dataFiles) - && Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles); + && Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles) + && Objects.equals(rowCount, dataSplit.rowCount); } @Override @@ -325,7 +330,8 @@ public int hashCode() { dataFiles, dataDeletionFiles, isStreaming, - rawConvertible); + rawConvertible, + rowCount); } @Override @@ -364,6 +370,7 @@ protected void assign(DataSplit other) { this.dataDeletionFiles = other.dataDeletionFiles; this.isStreaming = other.isStreaming; this.rawConvertible = other.rawConvertible; + this.rowCount = other.rowCount; } public void serialize(DataOutputView out) throws IOException { @@ -396,8 +403,8 @@ public void serialize(DataOutputView out) throws IOException { DeletionFile.serializeList(out, dataDeletionFiles); out.writeBoolean(isStreaming); - out.writeBoolean(rawConvertible); + out.writeLong(rowCount); } public static DataSplit deserialize(DataInputView in) throws IOException { @@ -433,6 +440,10 @@ public static DataSplit deserialize(DataInputView in) throws IOException { boolean isStreaming = in.readBoolean(); boolean rawConvertible = in.readBoolean(); + long rowCount = -1L; + if (version >= 9) { + rowCount = in.readLong(); + } DataSplit.Builder builder = builder() @@ -444,7 +455,8 @@ public static DataSplit deserialize(DataInputView in) throws IOException { .withBeforeFiles(beforeFiles) .withDataFiles(dataFiles) .isStreaming(isStreaming) - .rawConvertible(rawConvertible); + .rawConvertible(rawConvertible) + .withRowCount(rowCount); if (beforeDeletionFiles != null) { builder.withBeforeDeletionFiles(beforeDeletionFiles); @@ -473,7 +485,7 @@ private static FunctionWithIOException getFileMetaS DataFileMetaFirstRowIdLegacySerializer serializer = new DataFileMetaFirstRowIdLegacySerializer(); return serializer::deserialize; - } else if (version == 8) { + } else if (version == 8 || version == 9) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { @@ -556,6 +568,11 @@ public Builder rawConvertible(boolean rawConvertible) { return this; } + public Builder withRowCount(long rowCount) { + this.split.rowCount = rowCount; + return this; + } + public DataSplit build() { checkArgument(split.partition != null); checkArgument(split.bucket != -1); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java index baf4515baf32..3a0b476134d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java @@ -36,6 +36,7 @@ class SplitGroup { public final List files; public final boolean rawConvertible; + public long rowCount = -1L; private SplitGroup(List files, boolean rawConvertible) { this.files = files; @@ -49,5 +50,10 @@ public static SplitGroup rawConvertibleGroup(List files) { public static SplitGroup nonRawConvertibleGroup(List files) { return new SplitGroup(files, false); } + + public SplitGroup rowCount(long rowCount) { + this.rowCount = rowCount; + return this; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index df3435589d35..06649c4d3fcb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -404,6 +404,7 @@ private List generateSplits( String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); builder.withDataFiles(dataFiles) .rawConvertible(splitGroup.rawConvertible) + .withRowCount(splitGroup.rowCount > 0 ? splitGroup.rowCount : -1) .withBucketPath(bucketPath); if (deletionVectors && deletionFilesMap != null) { builder.withDataDeletionFiles( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index ab37159648ef..00acee2452e8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; @@ -179,6 +180,19 @@ public InternalRow next() { assertThat(i.get()).isEqualTo(1); } + @Test + public void testSnapshotRowCount() throws Exception { + createTableDefault(); + commitDefault(writeDataDefault(100, 1)); + long rowCount = + getTableDefault().newScan().plan().splits().stream() + .mapToLong(Split::rowCount) + .sum(); + assertThat(rowCount).isEqualTo(100); + assertThat(getTableDefault().snapshotManager().latestSnapshot().totalRecordCount()) + .isEqualTo(100); + } + protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index 65cd919e63e5..e363b02c0ded 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -600,6 +600,36 @@ public void testWithRowIds() throws Exception { assertThat(i.get()).isEqualTo(2); } + @Test + public void testSplit() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + try (BatchTableWrite write = builder.newWrite().withWriteType(schema.rowType())) { + write.write( + GenericRow.of(1, BinaryString.fromString("a"), BinaryString.fromString("b"))); + BatchTableCommit commit = builder.newCommit(); + List commitables = write.prepareCommit(); + commit.commit(commitables); + } + + RowType writeType1 = schema.rowType().project(Collections.singletonList("f2")); + try (BatchTableWrite write1 = builder.newWrite().withWriteType(writeType1)) { + write1.write(GenericRow.of(BinaryString.fromString("c"))); + + BatchTableCommit commit = builder.newCommit(); + List commitables = write1.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + + long rowCount = + readBuilder.newScan().plan().splits().stream().mapToLong(Split::rowCount).sum(); + assertThat(rowCount).isEqualTo(1L); + } + protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java index cfe9086fa041..6765dbb58783 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java @@ -765,6 +765,78 @@ public void testSerializerCompatibleV8() throws Exception { assertThat(actual).isEqualTo(split); } + @Test + public void testSerializerCompatibleV9() throws Exception { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + + DataFileMeta dataFile = + DataFileMeta.create( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs:///path/to/warehouse", + 12L, + Arrays.asList("a", "b", "c", "f")); + List dataFiles = Collections.singletonList(dataFile); + + DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); + List deletionFiles = Collections.singletonList(deletionFile); + + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition); + binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa")); + binaryRowWriter.complete(); + + DataSplit split = + DataSplit.builder() + .withSnapshot(18) + .withPartition(partition) + .withBucket(20) + .withTotalBuckets(32) + .withDataFiles(dataFiles) + .withDataDeletionFiles(deletionFiles) + .withBucketPath("my path") + .withRowCount(1024L) + .build(); + + assertThat(InstantiationUtil.clone(InstantiationUtil.clone(split))).isEqualTo(split); + + byte[] v6Bytes = + IOUtils.readFully( + DataSplitCompatibleTest.class + .getClassLoader() + .getResourceAsStream("compatibility/datasplit-v9"), + true); + + DataSplit actual = + InstantiationUtil.deserializeObject(v6Bytes, DataSplit.class.getClassLoader()); + assertThat(actual).isEqualTo(split); + assertThat(actual.rowCount()).isEqualTo(1024L); + } + private DataFileMeta newDataFile(long rowCount) { return newDataFile(rowCount, null, null); } diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v9 b/paimon-core/src/test/resources/compatibility/datasplit-v9 new file mode 100644 index 000000000000..5b7e432e3b6b Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/datasplit-v9 differ diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index b3c18a7bb630..6f75c06906eb 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -435,13 +435,23 @@ def weight_func(file_list: List[DataFileMeta]) -> int: packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, self.target_split_size) + rowCounts: List[int] = [ + sum([m[0].row_count for m in bunch]) + for bunch in packed_files + ] + # Flatten the packed files and build splits flatten_packed_files: List[List[DataFileMeta]] = [ [file for sub_pack in pack for file in sub_pack] for pack in packed_files ] - splits += self._build_split_from_pack(flatten_packed_files, file_entries, False) + splits_temp = self._build_split_from_pack(flatten_packed_files, file_entries, False) + + for i in range(len(splits_temp)): + splits_temp[i].set_row_count(rowCounts[i]) + + splits += splits_temp if self.idx_of_this_subtask is not None: self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) diff --git a/paimon-python/pypaimon/read/split.py b/paimon-python/pypaimon/read/split.py index f1ab5f3a5bf1..eaf5e7a6d07f 100644 --- a/paimon-python/pypaimon/read/split.py +++ b/paimon-python/pypaimon/read/split.py @@ -47,3 +47,6 @@ def file_size(self) -> int: @property def file_paths(self) -> List[str]: return self._file_paths + + def set_row_count(self, row_count: int) -> None: + self._row_count = row_count diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 67813ca5c3e3..44ddc0812ec3 100644 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -1334,7 +1334,10 @@ def test_blob_write_read_large_data_end_to_end_with_rolling(self): read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() - result = table_read.to_arrow(table_scan.plan().splits()) + splits = table_scan.plan().splits() + result = table_read.to_arrow(splits) + + self.assertEqual(sum([s._row_count for s in splits]), 40) # Verify the data self.assertEqual(result.num_rows, 40, "Should have 40 rows")