Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT>
*/
void addSplitsBack(List<SplitT> splits, int subtaskId);

int currentUnassignedSplitSize();

void handleSplitRequest(int subtaskId);

void registerReader(int subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ public void addSplitsBack(List<AmazonDynamoDBSourceSplit> splits, int subtaskId)
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
splitAssigner.addSplits(splits);
}

@Override
public int currentUnassignedSplitSize() {
return 0;
}

@Override
public void registerReader(int subtaskId) {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@ public void addSplitsBack(List<TiDBSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@ public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {
LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
}

@Override
public int currentUnassignedSplitSize() {
return this.pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new ClickhouseConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ protected void assignSplit() {
}
}

@Override
public int currentUnassignedSplitSize() {
return 0;
}

@Override
public void handleSplitRequest(int subtaskId) {
// nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,6 @@ public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return this.pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new DorisConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ public void dorisSourceSplitEnumeratorTest() {
Assertions.assertEquals(
allocateFiles(i, PARALLELISM, PARTITION_NUMS), splitAllValues.get(i).size());
}

// check no duplicate file assigned
Assertions.assertEquals(0, dorisSourceSplitEnumerator.currentUnassignedSplitSize());
}

private List<PartitionDefinition> buildPartitionDefinitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ public void addSplitsBack(List<EasysearchSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new EasysearchConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,6 @@ public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int subtaskId)
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new ElasticsearchConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
addSplitChangeToPendingAssignments(splits);
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@ private static int getSplitOwner(int assignCount, int numReaders) {
return assignCount % numReaders;
}

@Override
public int currentUnassignedSplitSize() {
return allSplit.size() - assignedSplit.size();
}

@Override
public void registerReader(int subtaskId) {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
assignSplit(subtaskId);
}

@Override
public int currentUnassignedSplitSize() {
return allSplit.size() - assignedSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ public EventListener getEventListener() {

fileSourceSplitEnumerator.run();

// check all files are assigned
Assertions.assertEquals(fileSourceSplitEnumerator.currentUnassignedSplitSize(), 0);

Set<FileSourceSplit> valueSet =
assignSplitMap.values().stream().flatMap(List::stream).collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ void assignSplitTest() throws Exception {
context, baseMultipleTableFileSourceConfig);

enumerator.open();
Assertions.assertEquals(50, enumerator.currentUnassignedSplitSize());
IntStream.range(0, parallelism).forEach(enumerator::registerReader);
enumerator.run();

Expand All @@ -99,9 +98,6 @@ void assignSplitTest() throws Exception {
Assertions.assertEquals(
allocateFiles(i, parallelism, fileSize), splitAllValues.get(i).size());
}

// check no duplicate file assigned
Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ public void addSplitsBack(List<HbaseSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void registerReader(int subtaskId) {
pendingSplit = getTableSplits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
assignSplit(subtaskId);
}

@Override
public int currentUnassignedSplitSize() {
return allSplit.size() - assignedSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void assignSplitRoundTest() throws Exception {
new MultipleTableHiveSourceSplitEnumerator(context, mockConfig);

enumerator.open();
Assertions.assertEquals(50, enumerator.currentUnassignedSplitSize());
IntStream.range(0, parallelism).forEach(enumerator::registerReader);
enumerator.run();

Expand All @@ -100,9 +99,6 @@ void assignSplitRoundTest() throws Exception {
Assertions.assertEquals(
allocateFiles(i, parallelism, fileSize), splitAllValues.get(i).size());
}

// check no duplicate file assigned
Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,6 @@ public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int subtaskId)
log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
}

@Override
public int currentUnassignedSplitSize() {
if (!pendingTables.isEmpty()) {
return pendingTables.size();
}
if (!pendingSplits.isEmpty()) {
return pendingSplits.values().stream().mapToInt(List::size).sum();
}
return 0;
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ public void addSplitsBack(List splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to InfluxDBSourceSplitEnumerator.", subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@ public void addSplitsBack(List<IoTDBSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to IoTDBSourceSplitEnumerator.", subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
}

@Override
public int currentUnassignedSplitSize() {
return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1;
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new JdbcConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,6 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
// Do nothing because Kafka source push split.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ private int getSplitOwner(String splitId, int numReaders) {
return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders;
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new KuduConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ public void addSplitsBack(List<MaxcomputeSourceSplit> splits, int subtaskId) {
addSplitChangeToPendingAssignments(splits);
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void registerReader(int subtaskId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ private void addPendingSplit(Collection<MilvusSourceSplit> splits, int ownerRead
pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits);
}

@Override
public int currentUnassignedSplitSize() {
return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1;
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new MilvusConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ public void addSplitsBack(List<MongoSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new MongodbConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ public void addSplitsBack(List<PaimonSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void registerReader(int subtaskId) {
readersAwaitingSplit.add(subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,6 @@ public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingPartitionSplits.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
// Do nothing because Pulsar source push split.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ public void addSplitsBack(List splits, int subtaskId) {
// do nothing
}

@Override
public int currentUnassignedSplitSize() {
return 0;
}

@Override
public void handleSplitRequest(int subtaskId) {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ public void addSplitsBack(List<RocketMqSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}

@Override
public void handleSplitRequest(int subtaskId) {
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ public void addSplitsBack(List<SlsSourceSplit> splits, int subtaskId) {
}
}

@Override
public int currentUnassignedSplitSize() {
return 0;
}

@Override
public void handleSplitRequest(int subtaskId) {}

Expand Down
Loading