Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
cd26913
[improve][start] version1.0
Jul 3, 2025
9dba128
[improve][start] version2.0
Jul 8, 2025
e020b82
[improve][start] version3.0
Jul 11, 2025
f870cef
[improve][start] version4.0
Jul 11, 2025
bef73ee
[improve][start] version5.0
Jul 15, 2025
777d513
[improve][start] support 1.20.1 demo
Jul 15, 2025
bc46786
[improve][start] refactor the code
Jul 16, 2025
87b0319
Merge branch 'dev' into BDPL-support1.20.1-test
Jul 16, 2025
4229fc1
[improve][start] Remove unnecessary classes and comment content.
Jul 19, 2025
943c501
[improve][start] Remove unnecessary classes and comment content.
Jul 19, 2025
0b6ad1f
[improve][start] Remove unnecessary classes.
Jul 19, 2025
e3afc1d
[improve][start] Fix some details
Jul 19, 2025
4bc665d
[improve][start] Fix some details in the flink translation module
Jul 19, 2025
2a6c2ef
[improve][start] removed content consistent with seatunnel-translatio…
Aug 2, 2025
96d9ebc
[improve][start] Optimized the details
Aug 2, 2025
baf00f6
[improve][start] add Flink20Container
Aug 3, 2025
87c2718
[improve][start] fix details
Aug 3, 2025
f835c3e
[improve][start] modify the docker image
Aug 6, 2025
f12e8c9
[improve][start] delete getFlinkProperties
Aug 6, 2025
68d058e
[improve][start] delete getFlinkProperties
Aug 6, 2025
930ca3c
[improve][start] change 1.20.1
Aug 6, 2025
ccc5301
[improve][start] fix yaml
Aug 7, 2025
014c247
[improve][start] fix Flink20Container
Aug 7, 2025
683dcdd
[improve][start] fix Flink20Container
Aug 7, 2025
0fe4198
[improve][start] test Flink20Container
Aug 7, 2025
f3781e9
[improve][start] test Flink20Container
Aug 8, 2025
f8bce98
[improve][start] test Flink20Container
Aug 8, 2025
66e1cdd
[improve][start] test Flink20Container
Aug 8, 2025
d6ad07a
[improve][start] test Flink20Container
Aug 9, 2025
56bf1b1
[improve][start] add log
Aug 9, 2025
32992a9
[improve][start] add log
Aug 9, 2025
b3f023e
[improve][start] add starup
Aug 9, 2025
496f3e3
[improve][start] fix starup
Aug 9, 2025
d1632c9
[improve][start] Add permissions
Aug 9, 2025
11498d2
[improve][start] fix bug
Aug 10, 2025
4afb75f
[improve][start] fix bug
Aug 10, 2025
7b85130
[improve][start] fix ci bug
Aug 11, 2025
eef9566
[improve][start] fix ci bug
Aug 11, 2025
1bbbac9
[improve][start] fix ci bug
Aug 11, 2025
c7d4832
[improve][start] test flink 1.20
Aug 12, 2025
4da9fba
[improve][start] add sink feature
Aug 12, 2025
4b0d9de
[improve][start] add createWrite
Aug 12, 2025
a6e3d81
[improve][start] add createWrite
Aug 12, 2025
980bdfc
[improve][start] add createWrite
Aug 12, 2025
e16a8fd
[improve][start] revert 0811
Aug 13, 2025
f4ea71d
[improve][start] ADD prepareCommit
Aug 13, 2025
daeed56
[improve][start] ADD prepareCommit test
Aug 13, 2025
46d0251
[improve][start] test
Aug 13, 2025
a4db0fa
[improve][start] test1
Aug 13, 2025
020116e
[improve][start] test2
Aug 14, 2025
dba04e0
[improve][start] test3
Aug 15, 2025
eed5ed2
[improve][start] test4
Aug 16, 2025
7d4b098
[improve][start] test5
Aug 16, 2025
3ace392
[improve][start] test6
Aug 16, 2025
c686cb8
[improve][start] test7
Aug 17, 2025
dfa45c1
[improve][start] test8
Aug 17, 2025
5111f9d
[improve][start] test9
Aug 18, 2025
80c0c3e
[improve][start] test10
Aug 18, 2025
a20f02e
[improve][start] test11
Aug 19, 2025
e60ab2c
[improve][start] test12 Flink 1.20 feature [FLINK-33817]
Aug 19, 2025
7991b6b
[improve][start] add log
Aug 19, 2025
b868934
[improve][start] add log
Aug 19, 2025
ef5c71e
[improve][start] add log1
Aug 20, 2025
58fe9c8
[improve][start] fix
Aug 20, 2025
52eb0bd
Merge branch 'BDPL-support1.20.1-test-demo3' into BDPL-support1.20.1-…
Aug 21, 2025
7c30f85
[improve][start] remove logs
Aug 21, 2025
b50842e
[improve][start] fix <dependency>
Aug 21, 2025
23ef178
[feature] fix
Aug 22, 2025
2adb82c
[feature] fix 0823
Aug 23, 2025
60dee85
[feature] fix 0824
Aug 24, 2025
21b1a0e
[feature] fix 0824
Aug 24, 2025
0c8d237
[feature] Simplify the code and remove redundant logs
Aug 25, 2025
fddd0d4
Merge branch 'dev' into BDPL-support1.20.1-test
Aug 25, 2025
1bc8ccc
[feature] rebase dev and remove log
Aug 25, 2025
17ae141
[feature] fix details 0825
Aug 25, 2025
7b1843b
[feature] fix details(MOUNTS_PATH) 0825
Aug 25, 2025
a99e080
[feature] restore file change
Aug 26, 2025
3d906df
[feature] refactor code
Aug 30, 2025
d604726
[feature] fix refactor code
Aug 30, 2025
bb73acd
Merge branch 'dev' into BDPL-support1.20.1-test
Aug 31, 2025
f386103
[feature] fix ci build
Aug 31, 2025
c799532
[feature] unified to prioritize LoggerFactory and remove tryGetFromWr…
Sep 16, 2025
6624ba3
Merge branch 'dev' into BDPL-support1.20.1-test
Sep 16, 2025
f2dd36b
[feature] restore TestEmbeddingIT
Sep 23, 2025
d257efa
Merge branch 'dev' into BDPL-support1.20.1-test
yzeng1618 Sep 26, 2025
35cceda
[feature] apply reviewer feedback
Sep 28, 2025
d46e2c2
Merge branch 'apache:dev' into BDPL-support1.20.1-test
yzeng1618 Sep 28, 2025
197324e
Merge remote-tracking branch 'origin/BDPL-support1.20.1-test' into BD…
Sep 28, 2025
4ff372f
[Feature][hive] increase timeout
Sep 28, 2025
994dfd6
[Feature] fix cdc bug
Sep 29, 2025
d051387
[Feature][flink1.20] remove code
Sep 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 120
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -874,7 +874,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 150
timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -1163,7 +1163,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 120
timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -1238,7 +1238,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 140
timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -1314,7 +1314,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 120
timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand All @@ -1339,7 +1339,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 150
timeout-minutes: 180
steps:
- name: Checkout repository
uses: actions/checkout@v2
Expand Down Expand Up @@ -1419,7 +1419,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 180
timeout-minutes: 210
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand All @@ -1444,7 +1444,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 120
timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<commons-logging.version>1.2</commons-logging.version>
<flink.1.13.6.version>1.13.6</flink.1.13.6.version>
<flink.1.15.3.version>1.15.3</flink.1.15.3.version>
<flink.1.20.1.version>1.20.1</flink.1.20.1.version>
<spark.2.4.0.version>2.4.0</spark.2.4.0.version>
<spark.3.3.0.version>3.3.0</spark.3.3.0.version>
<spark.binary.2.4.version>2.4</spark.binary.2.4.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum EngineType {
SPARK3("spark", "seatunnel-spark-3-starter.jar", "start-seatunnel-spark-3-connector-v2.sh"),
FLINK13("flink", "seatunnel-flink-13-starter.jar", "start-seatunnel-flink-13-connector-v2.sh"),
FLINK15("flink", "seatunnel-flink-15-starter.jar", "start-seatunnel-flink-15-connector-v2.sh"),
FLINK20("flink", "seatunnel-flink-20-starter.jar", "start-seatunnel-flink-20-connector-v2.sh"),
SEATUNNEL("seatunnel", "seatunnel-starter.jar", "seatunnel.sh");

private final String engine;
Expand Down
1 change: 1 addition & 0 deletions seatunnel-core/seatunnel-flink-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<modules>
<module>seatunnel-flink-13-starter</module>
<module>seatunnel-flink-15-starter</module>
<module>seatunnel-flink-20-starter</module>
<module>seatunnel-flink-starter-common</module>
</modules>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,97 +17,20 @@

package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/** The SeaTunnel flink starter, used to generate the final flink job execute command. */
public class FlinkStarter implements Starter {
private static final String APP_NAME = SeaTunnelFlink.class.getName();
/**
* The SeaTunnel flink starter for Flink 1.13, used to generate the final flink job execute command.
*/
public class FlinkStarter extends AbstractFlinkStarter {
public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName();
public static final String SHELL_NAME = EngineType.FLINK13.getStarterShellName();
public static final String RUNTIME_FILE = "runtime.tar.gz";
private final FlinkCommandArgs flinkCommandArgs;
private final String appJar;

FlinkStarter(String[] args) {
this.flinkCommandArgs =
CommandLineUtils.parse(args, new FlinkCommandArgs(), SHELL_NAME, true);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
super(args, EngineType.FLINK13);
}

public static void main(String[] args) {
FlinkStarter flinkStarter = new FlinkStarter(args);
System.out.println(String.join(" ", flinkStarter.buildCommands()));
}

@Override
public List<String> buildCommands() {
List<String> command = new ArrayList<>();
// set start command
command.add("${FLINK_HOME}/bin/flink");
// set deploy mode, run or run-application
command.add(flinkCommandArgs.getDeployMode().getDeployMode());
// set submitted target master
if (flinkCommandArgs.getMasterType() != null) {
command.add("--target");
command.add(flinkCommandArgs.getMasterType().getMaster());
}
// set yarn application mode parameters
if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION) {
command.add(
String.format("-Dyarn.ship-files=\"%s\"", flinkCommandArgs.getConfigFile()));
command.add(String.format("-Dyarn.ship-archives=%s", RUNTIME_FILE));
}
// set yarn application name
if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION
|| flinkCommandArgs.getMasterType() == MasterType.YARN_PER_JOB
|| flinkCommandArgs.getMasterType() == MasterType.YARN_SESSION) {
command.add(String.format("-Dyarn.application.name=%s", flinkCommandArgs.getJobName()));
}
// set flink original parameters
command.addAll(flinkCommandArgs.getOriginalParameters());
// set main class name
command.add("-c");
command.add(APP_NAME);
// set main jar name
command.add(appJar);
// set config file path
command.add("--config");
command.add(flinkCommandArgs.getConfigFile());
// set check config flag
if (flinkCommandArgs.isCheckConfig()) {
command.add("--check");
}
// set job name
command.add("--name");
command.add(flinkCommandArgs.getJobName());
// set encryption
if (flinkCommandArgs.isEncrypt()) {
command.add("--encrypt");
}
// set decryption
if (flinkCommandArgs.isDecrypt()) {
command.add("--decrypt");
}
// set deploy mode
command.add("--deploy-mode");
command.add(flinkCommandArgs.getDeployMode().getDeployMode());
// set extra system properties
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(String::trim)
.forEach(variable -> command.add("-i " + variable));
return command;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@
package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

public class SeaTunnelFlink {
/** SeaTunnel Flink 1.13 main entry point. */
public class SeaTunnelFlink extends AbstractSeaTunnelFlink {
public static void main(String[] args) throws CommandException {
FlinkCommandArgs flinkCommandArgs =
CommandLineUtils.parse(
args,
new FlinkCommandArgs(),
EngineType.FLINK13.getStarterShellName(),
true);
SeaTunnel.run(flinkCommandArgs.buildCommand());
runSeaTunnel(args, EngineType.FLINK13);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FlinkRuntimeEnvironment extends AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironment {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@

import org.apache.flink.streaming.api.datastream.DataStreamSink;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.HashMap;
Expand All @@ -61,10 +62,11 @@
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

private static final Logger LOGGER = LoggerFactory.getLogger(SinkExecuteProcessor.class);

protected SinkExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
Expand Down Expand Up @@ -167,7 +169,7 @@ public SeaTunnelSink tryGenerateMultiTableSink(
ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink template");
LOGGER.info("Unsupported multi table sink api, rollback to sink template");
// choose the first sink
return sinks.values().iterator().next();
}
Expand Down
Loading