Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ File modification time filter. The connector will filter some files base on the

### enable_file_split [string]

Turn on the file splitting function, the default is false。It can be selected when the file type is csv, text, json and non-compressed format.
Turn on the file splitting function, the default is false。It can be selected when the file type is csv, text, json, parquet and non-compressed format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use English punctuation marks


### file_split_size [long]

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ null_format 定义哪些字符串可以表示为 null。

### enable_file_split [boolean]

开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择
开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择

### file_split_size [long]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@
<artifactId>flexmark-all</artifactId>
<version>${flexmark-all.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
FILE_READ_FAILED("FILE-08", "File read failed"),
BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order abnormality");
BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order abnormality"),
FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than 0"),
FILE_SPLIT_FAIL("FILE-11", "File split fail");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
Expand Down Expand Up @@ -89,6 +90,14 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
this.read(new FileSourceSplit(path), output);
}

@Override
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException {
String tableId = split.getTableId();
String path = split.getFilePath();
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
String.format(
Expand All @@ -107,11 +116,18 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion());
dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion());
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
final boolean useSplitRange =
enableSplitFile && split.getStart() >= 0 && split.getLength() > 0;
GenericRecord record;
try (ParquetReader<GenericData.Record> reader =
AvroParquetReader.Builder<GenericData.Record> builder =
AvroParquetReader.<GenericData.Record>builder(hadoopInputFile)
.withDataModel(dataModel)
.build()) {
.withDataModel(dataModel);
if (useSplitRange) {
long start = split.getStart();
long end = start + split.getLength();
builder.withFileRange(start, end);
}
try (ParquetReader<GenericData.Record> reader = builder.build()) {
while ((record = reader.read()) != null) {
Object[] fields;
if (isMergePartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
import java.io.Serializable;
import java.util.List;

/**
* {@link FileSplitStrategy} defines the contract for splitting a file into one or more {@link
* FileSourceSplit}s that can be processed in parallel by file-based sources.
*
* <p>The split strategy determines how a file is logically divided, such as by byte ranges, record
* boundaries, or format-specific physical units. Implementations are responsible for ensuring that
* each generated split is readable and does not violate the semantics of the underlying file
* format.
*
* <p>The resulting {@link FileSourceSplit}s describe the portion of the file to be read, while the
* actual data parsing and decoding are handled by the corresponding reader implementation.
*/
public interface FileSplitStrategy extends Serializable {

List<FileSourceSplit> split(String tableId, String filePath);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.source.split;

import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* {@link ParquetFileSplitStrategy} defines a split strategy for Parquet files based on Parquet
* physical storage units (RowGroups).
*
* <p>This strategy uses {@code RowGroup} as the minimum indivisible split unit and generates {@link
* FileSourceSplit}s by merging one or more contiguous RowGroups according to the configured split
* size. A split will never break a RowGroup, ensuring correctness and compatibility with Parquet
* readers.
*
* <p>The generated split range ({@code start}, {@code length}) represents a byte range covering
* complete RowGroups. The actual row-level reading and decoding are delegated to the Parquet reader
* implementation.
*
* <p>This design enables efficient parallel reading of Parquet files while preserving Parquet
* format semantics and avoiding invalid byte-level splits.
*/
public class ParquetFileSplitStrategy implements FileSplitStrategy {

private final long splitSizeBytes;

public ParquetFileSplitStrategy(long splitSizeBytes) {
if (splitSizeBytes <= 0) {
throw new SeaTunnelRuntimeException(
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
"SplitSizeBytes must be greater than 0");
}
this.splitSizeBytes = splitSizeBytes;
}

@Override
public List<FileSourceSplit> split(String tableId, String filePath) {
try {
return splitByRowGroups(tableId, filePath, readRowGroups(filePath));
} catch (IOException e) {
throw new SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
}
}

/**
* Core split logic based on row group metadata. This method is IO-free and unit-test friendly.
*/
List<FileSourceSplit> splitByRowGroups(
String tableId, String filePath, List<BlockMetaData> rowGroups) {
List<FileSourceSplit> splits = new ArrayList<>();
if (rowGroups == null || rowGroups.isEmpty()) {
return splits;
}
long currentStart = 0;
long currentLength = 0;
boolean hasOpenSplit = false;
for (BlockMetaData block : rowGroups) {
long rgStart = block.getStartingPos();
long rgSize = block.getCompressedSize();
// start a new split
if (!hasOpenSplit) {
currentStart = rgStart;
currentLength = rgSize;
hasOpenSplit = true;
continue;
}
// exceeds threshold, close current split
if (currentLength + rgSize > splitSizeBytes) {
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
// start next split
currentStart = rgStart;
currentLength = rgSize;
} else {
currentLength += rgSize;
}
}
// last split
if (hasOpenSplit && currentLength > 0) {
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
}
return splits;
}

private List<BlockMetaData> readRowGroups(String filePath) throws IOException {
Path path = new Path(filePath);
Configuration conf = new Configuration();
try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
return reader.getFooter().getBlocks();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.file.source.split;

import org.apache.parquet.hadoop.metadata.BlockMetaData;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.when;

public class ParquetFileSplitStrategyTest {

private static final String TABLE_ID = "test.test_table";
private static final String FILE_PATH = "/tmp/test.parquet";

@Test
void testSplitByRowGroupsEmpty() {
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(100);
List<FileSourceSplit> splits =
strategy.splitByRowGroups(TABLE_ID, FILE_PATH, Collections.emptyList());
Assertions.assertTrue(splits.isEmpty());
}

@Test
void testSplitByRowGroupsSingleRowGroup() {
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(1000);
List<BlockMetaData> blocks = new ArrayList<>();
blocks.add(mockBlock(0, 200));
List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, FILE_PATH, blocks);
Assertions.assertEquals(1, splits.size());
FileSourceSplit split = splits.get(0);
Assertions.assertEquals(0, split.getStart());
Assertions.assertEquals(200, split.getLength());
}

@Test
void testSplitByRowGroupsMergeRowGroups() {
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(500);
List<BlockMetaData> blocks = new ArrayList<>();
blocks.add(mockBlock(0, 100));
blocks.add(mockBlock(100, 150));
blocks.add(mockBlock(250, 200));
List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, FILE_PATH, blocks);
// 100 + 150 + 200 = 450 < 500
Assertions.assertEquals(1, splits.size());
FileSourceSplit split = splits.get(0);
Assertions.assertEquals(0, split.getStart());
Assertions.assertEquals(450, split.getLength());
}

@Test
void testSplitByRowGroupsSplitWhenExceedsThreshold() {
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(300);
List<BlockMetaData> blocks = new ArrayList<>();
blocks.add(mockBlock(0, 100));
blocks.add(mockBlock(100, 150));
blocks.add(mockBlock(250, 200));
List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, FILE_PATH, blocks);
Assertions.assertEquals(2, splits.size());
FileSourceSplit first = splits.get(0);
Assertions.assertEquals(0, first.getStart());
Assertions.assertEquals(250, first.getLength());
FileSourceSplit second = splits.get(1);
Assertions.assertEquals(250, second.getStart());
Assertions.assertEquals(200, second.getLength());
}

private BlockMetaData mockBlock(long start, long compressedSize) {
BlockMetaData block = Mockito.mock(BlockMetaData.class);
when(block.getStartingPos()).thenReturn(start);
when(block.getCompressedSize()).thenReturn(compressedSize);
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;

import static org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;

Expand All @@ -42,9 +44,13 @@ public String getPluginName() {
}

private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
return new DefaultFileSplitStrategy();
}
long fileSplitSize = readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
if (FileFormat.PARQUET == readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
return new ParquetFileSplitStrategy(fileSplitSize);
}
String rowDelimiter =
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
? DEFAULT_ROW_DELIMITER
Expand All @@ -54,8 +60,7 @@ private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig readonlyCo
? 1L
: readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
String encodingName = readonlyConfig.get(FileBaseSourceOptions.ENCODING);
long splitSize = readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
return new LocalFileAccordingToSplitSizeSplitStrategy(
rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
rowDelimiter, skipHeaderRowNumber, encodingName, fileSplitSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ public OptionRule optionRule() {
FileBaseSourceOptions.ENCODING)
.conditional(
FileBaseSourceOptions.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON, FileFormat.CSV),
Arrays.asList(
FileFormat.TEXT,
FileFormat.JSON,
FileFormat.CSV,
FileFormat.PARQUET),
FileBaseSourceOptions.ENABLE_FILE_SPLIT)
.conditional(
FileBaseSourceOptions.ENABLE_FILE_SPLIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
helper.execute("/parquet/fake_to_local_file_parquet.conf");
// test read local parquet file
helper.execute("/parquet/local_file_parquet_to_assert.conf");
helper.execute("/parquet/local_file_parquet_enable_split_to_assert.conf");
// test read local parquet file with projection
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
// test read filtered local file
Expand Down
Loading