Skip to content
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6cb4abf
init
Aug 19, 2025
e7f0632
init
Aug 20, 2025
1e48dc4
init
Aug 20, 2025
0868921
init
Aug 20, 2025
25cbac9
init
Aug 20, 2025
41c06cf
init
Aug 20, 2025
f780aa6
init
Aug 20, 2025
02ec69b
init
dyp12 Aug 21, 2025
4574f83
Merge branch 'apache:dev' into dev
dyp12 Aug 23, 2025
24d6f1a
init
dyp12 Aug 25, 2025
51ad4e9
code format
dyp12 Aug 25, 2025
72199d7
Merge branch 'apache:dev' into dev
dyp12 Aug 28, 2025
979fb5e
init
Aug 29, 2025
4157583
init
Aug 29, 2025
f098cac
init
Aug 29, 2025
529dc6c
init
Aug 29, 2025
0e7ee4c
init
Aug 29, 2025
73bec44
init
Aug 29, 2025
b263e7a
init
Aug 30, 2025
bf3e6ec
Merge branch 'apache:dev' into dev
dyp12 Aug 30, 2025
660a21e
init
Aug 30, 2025
1add2d1
init
Aug 30, 2025
d4bfa25
init
Aug 31, 2025
16d2a33
feat:update doc
Sep 4, 2025
76647a5
feat:update
Sep 5, 2025
79e9efe
feat:update
Sep 5, 2025
5ec634d
Merge branch 'apache:dev' into dev
dyp12 Sep 5, 2025
e78c133
Merge branch 'apache:dev' into dev
dyp12 Sep 23, 2025
db0cd84
Merge branch 'apache:dev' into dev
dyp12 Nov 24, 2025
6edb142
fead:add fluss sink
Nov 26, 2025
c1594f5
fead:add fluss sink
Nov 26, 2025
6741580
fead:add fluss sink
Nov 26, 2025
63cd19f
fead:add fluss sink doc
Nov 27, 2025
3bf8dbf
fead:add fluss sink doc
Nov 27, 2025
f9616f2
fead:add fluss sink doc
Nov 27, 2025
d72dd86
fead:add createWriter
Nov 27, 2025
5c6afb7
fead:add createWriter
Nov 27, 2025
39f9c0d
fead:add createWriter
Nov 27, 2025
914385b
fead:add createWriter
Nov 27, 2025
935ee4c
Merge branch 'dev' into dev
dyp12 Nov 27, 2025
4b0aacd
fead:add createWriter
Nov 27, 2025
e4a0fc5
fead:add createWriter
Nov 28, 2025
644c179
fead:add createWriter
Nov 28, 2025
570e69f
Update AbstractSchemaChangeBaseIT.java
dyp12 Nov 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ seatunnel.source.GraphQL = connector-graphql
seatunnel.sink.GraphQL = connector-graphql
seatunnel.sink.Aerospike = connector-aerospike
seatunnel.sink.SensorsData = connector-sensorsdata
seatunnel.sink.Fluss = connector-fluss

# For custom transforms, make sure to use the seatunnel.transform.[PluginIdentifier]=[JarPerfix] naming convention. For example:
# seatunnel.transform.Sql = seatunnel-transforms-v2
51 changes: 51 additions & 0 deletions seatunnel-connectors-v2/connector-fluss/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-fluss</artifactId>
<name>SeaTunnel : Connectors V2 : Fluss</name>

<properties>
<fluss.client.version>0.7.0</fluss.client.version>
<connector.name>connector.fluss</connector.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>${fluss.client.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.fluss.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.io.Serializable;
import java.util.Map;

public class FlussBaseOptions implements Serializable {
public static final String CONNECTOR_IDENTITY = "Fluss";
public static final Option<String> BOOTSTRAP_SERVERS =
Options.key("bootstrap.servers")
.stringType()
.noDefaultValue()
.withDescription("Fluss cluster address");
public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("The name of Fluss database");

public static final Option<String> TABLE =
Options.key("table")
.stringType()
.noDefaultValue()
.withDescription("The name of Fluss table");

public static final Option<Map<String, String>> CLIENT_CONFIG =
Options.key("client.config")
.mapType()
.noDefaultValue()
.withDescription("The parameter of Fluss client add to Connection ");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fluss.config;

public class FlussSinkOptions extends FlussBaseOptions {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fluss.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FlussSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {

private final ReadonlyConfig pluginConfig;
private final CatalogTable catalogTable;

public FlussSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.pluginConfig = pluginConfig;
this.catalogTable = catalogTable;
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new FlussSinkWriter(context, catalogTable, pluginConfig);
}

@Override
public String getPluginName() {
return FlussSinkOptions.CONNECTOR_IDENTITY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fluss.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class FlussSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return FlussSinkOptions.CONNECTOR_IDENTITY;
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(FlussSinkOptions.BOOTSTRAP_SERVERS)
.optional(FlussSinkOptions.DATABASE)
.optional(FlussSinkOptions.TABLE)
.optional(FlussSinkOptions.CLIENT_CONFIG)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
return () -> new FlussSink(context.getOptions(), context.getCatalogTable());
}
}
Loading
Loading