Skip to content

Commit ae2c6b3

Browse files
committed
[Fix][Flink] Supports multiple parallelisms and coordinator state persistence
1 parent 5f4e944 commit ae2c6b3

File tree

18 files changed

+918
-802
lines changed

18 files changed

+918
-802
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.seatunnel.api.sink;
1919

20-
import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
21-
import org.apache.seatunnel.api.table.schema.event.FlushEvent;
2220
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
2321

2422
import java.io.IOException;
@@ -32,48 +30,4 @@ public interface SupportSchemaEvolutionSinkWriter {
3230
* @throws IOException
3331
*/
3432
void applySchemaChange(SchemaChangeEvent event) throws IOException;
35-
36-
/**
37-
* handle FlushEvent propagated from upstream
38-
*
39-
* @param event
40-
* @throws IOException
41-
*/
42-
default void handleFlushEvent(FlushEvent event) throws IOException {
43-
flushData();
44-
sendFlushSuccessful(event);
45-
}
46-
47-
/**
48-
* send success event to coordinator upon successful flash
49-
*
50-
* @param event
51-
* @throws IOException
52-
*/
53-
default void sendFlushSuccessful(FlushEvent event) throws IOException {
54-
SchemaCoordinator coordinator = getSchemaCoordinator();
55-
if (coordinator == null && event != null && event.getJobId() != null) {
56-
coordinator = SchemaCoordinator.getOrCreateInstance(event.getJobId());
57-
}
58-
59-
if (coordinator != null) {
60-
coordinator.notifyFlushSuccessful(event.getJobId(), event.tableIdentifier());
61-
}
62-
}
63-
64-
/**
65-
* Get the schema coordinator instance for reporting flush completion
66-
*
67-
* @return the schema coordinator instance, or null if not available
68-
*/
69-
default SchemaCoordinator getSchemaCoordinator() {
70-
return null;
71-
}
72-
73-
/**
74-
* flush data to other system
75-
*
76-
* @throws IOException
77-
*/
78-
default void flushData() throws IOException {}
7933
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.seatunnel.api.sink.SinkWriter;
2222
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
2323
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
24-
import org.apache.seatunnel.api.table.schema.event.FlushEvent;
2524
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
2625
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2726
import org.apache.seatunnel.api.tracing.MDCTracer;
@@ -176,26 +175,6 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
176175
}
177176
}
178177

179-
@Override
180-
public void handleFlushEvent(FlushEvent event) throws IOException {
181-
subSinkErrorCheck();
182-
String targetTableId = event.tableIdentifier().toTablePath().getFullName();
183-
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
184-
for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriterEntry :
185-
sinkWritersWithIndex.get(i).entrySet()) {
186-
if (sinkWriterEntry.getKey().getTableIdentifier().equals(targetTableId)) {
187-
synchronized (runnable.get(i)) {
188-
SinkWriter<SeaTunnelRow, ?, ?> sink = sinkWriterEntry.getValue();
189-
if (sink instanceof SupportSchemaEvolutionSinkWriter) {
190-
((SupportSchemaEvolutionSinkWriter) sink).handleFlushEvent(event);
191-
}
192-
}
193-
return;
194-
}
195-
}
196-
}
197-
}
198-
199178
@Override
200179
public void write(SeaTunnelRow element) throws IOException {
201180
if (element != null && element.getOptions() != null) {

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ public void run() {
5050
if (row == null) {
5151
continue;
5252
}
53+
// control rows used for schema evolution / coordination
54+
// are represented as SeaTunnelRow with zero fields (arity == 0)
55+
if (row.getArity() == 0) {
56+
log.debug(
57+
"Skip control SeaTunnelRow with zero arity in MultiTableWriterRunnable: {}",
58+
row);
59+
continue;
60+
}
5361
SinkWriter<SeaTunnelRow, ?, ?> writer = tableIdWriterMap.get(row.getTableId());
5462
if (writer == null) {
5563
if (tableIdWriterMap.size() == 1) {

0 commit comments

Comments
 (0)