diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/CatalogExceptionUtils.java
similarity index 97%
rename from fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java
rename to fluss-common/src/main/java/com/alibaba/fluss/utils/CatalogExceptionUtils.java
index f033ead9..67d8e40e 100644
--- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/CatalogExceptionUtils.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.connector.flink.utils;
+package com.alibaba.fluss.utils;
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java
index ddbc8a5f..9b6401cf 100644
--- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java
+++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java
@@ -22,12 +22,12 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalog;
-import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.utils.CatalogExceptionUtils;
import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.IOUtils;
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-3.3/pom.xml b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-3.3/pom.xml
new file mode 100644
index 00000000..47b82379
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-3.3/pom.xml
@@ -0,0 +1,83 @@
+
+
+
+
+ 4.0.0
+
+ com.alibaba.fluss
+ fluss-connector-spark
+ 0.6-SNAPSHOT
+
+
+ fluss-connector-spark-3.3
+
+ Fluss : Connector : Spark : 3.3
+
+
+ 3.3.3
+
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+
+ com.alibaba.fluss
+ fluss-connector-spark-common
+ ${project.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-fluss
+ package
+
+ shade
+
+
+
+
+ com.alibaba.fluss:fluss-connector-spark-common
+ com.alibaba.fluss:fluss-client
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/pom.xml b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/pom.xml
new file mode 100644
index 00000000..c192ebea
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+
+ 4.0.0
+
+ com.alibaba.fluss
+ fluss-connector-spark
+ 0.6-SNAPSHOT
+
+
+ fluss-connector-spark-common
+
+ Fluss : Connector : Spark : Common
+
+
+ 2.12.15
+ 3.5.3
+
+
+
+
+
+ com.alibaba.fluss
+ fluss-client
+ ${project.version}
+
+
+
+ org.scala-lang
+ scala-library
+ ${scala.common.version}
+
+
+
+ org.scala-lang
+ scala-compiler
+ ${scala.common.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.common.version}
+ provided
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.common.version}
+ provided
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+ test
+
+
+
+ com.alibaba.fluss
+ fluss-server
+ ${project.version}
+ test
+ test-jar
+
+
+
+ com.alibaba.fluss
+ fluss-test-utils
+ ${project.version}
+ test
+
+
+
\ No newline at end of file
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/SparkConnectorOptions.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/SparkConnectorOptions.java
new file mode 100644
index 00000000..84972eaa
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/SparkConnectorOptions.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark;
+
+import com.alibaba.fluss.config.ConfigOption;
+import com.alibaba.fluss.config.FlussConfigUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.alibaba.fluss.config.ConfigBuilder.key;
+
+/** Options for spark connector. */
+public class SparkConnectorOptions {
+
+ public static final ConfigOption BUCKET_NUMBER =
+ key("bucket.num")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The number of buckets of a Fluss table.");
+
+ public static final ConfigOption BUCKET_KEY =
+ key("bucket.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specific the distribution policy of the Fluss table. "
+ + "Data will be distributed to each bucket according to the hash value of bucket-key. "
+ + "If you specify multiple fields, delimiter is ','. "
+ + "If the table is with primary key, you can't specific bucket key currently. "
+ + "The bucket keys will always be the primary key. "
+ + "If the table is not with primary key, you can specific bucket key, and when the bucket key is not specified, "
+ + "the data will be distributed to each bucket randomly.");
+
+ public static final ConfigOption BOOTSTRAP_SERVERS =
+ key("bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. "
+ + "The list should be in the form host1:port1,host2:port2,....");
+
+ public static final ConfigOption PRIMARY_KEY =
+ key("primary.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the primary key of fluss table, such as key1,key2,...");
+
+ // --------------------------------------------------------------------------------------------
+ // Lookup specific options
+ // --------------------------------------------------------------------------------------------
+
+ public static final ConfigOption LOOKUP_ASYNC =
+ key("lookup.async")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to set async lookup. Default is true.");
+
+ // --------------------------------------------------------------------------------------------
+ // Scan specific options
+ // --------------------------------------------------------------------------------------------
+
+ public static final ConfigOption SCAN_STARTUP_MODE =
+ key("scan.startup.mode")
+ .enumType(ScanStartupMode.class)
+ .defaultValue(ScanStartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for Fluss source. Default is 'initial'.");
+
+ public static final ConfigOption SCAN_STARTUP_TIMESTAMP =
+ key("scan.startup.timestamp")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp for Fluss source in case of startup mode is timestamp. "
+ + "The format is 'timestamp' or 'yyyy-MM-dd HH:mm:ss'. "
+ + "Like '1678883047356' or '2023-12-09 23:09:12'.");
+
+ public static final ConfigOption SCAN_PARTITION_DISCOVERY_INTERVAL =
+ key("scan.partition.discovery.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription(
+ "The interval in milliseconds for the Fluss source to discover "
+ + "the new partitions for partitioned table while scanning."
+ + " A non-positive value disables the partition discovery.");
+
+ // --------------------------------------------------------------------------------------------
+ // table storage specific options
+ // --------------------------------------------------------------------------------------------
+
+ public static final List> TABLE_OPTIONS =
+ new ArrayList<>(FlussConfigUtils.TABLE_OPTIONS.values());
+
+ // --------------------------------------------------------------------------------------------
+ // client specific options
+ // --------------------------------------------------------------------------------------------
+
+ public static final List> CLIENT_OPTIONS =
+ new ArrayList<>(FlussConfigUtils.CLIENT_OPTIONS.values());
+
+ // ------------------------------------------------------------------------------------------
+
+ /** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */
+ public enum ScanStartupMode {
+ INITIAL(
+ "initial",
+ "Performs an initial snapshot n the table upon first startup, "
+ + "ans continue to read the latest changelog with exactly once guarantee. "
+ + "If the table to read is a log table, the initial snapshot means "
+ + "reading from earliest log offset. If the table to read is a primary key table, "
+ + "the initial snapshot means reading a latest snapshot which "
+ + "materializes all changes on the table."),
+ EARLIEST("earliest", "Start reading logs from the earliest offset."),
+ LATEST("latest", "Start reading logs from the latest offset."),
+ TIMESTAMP("timestamp", "Start reading logs from user-supplied timestamp.");
+
+ private final String value;
+ private final String description;
+
+ ScanStartupMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalog.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalog.java
new file mode 100644
index 00000000..b3ece08c
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalog.java
@@ -0,0 +1,366 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.catalog;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.connector.spark.SparkConnectorOptions;
+import com.alibaba.fluss.connector.spark.exception.CatalogException;
+import com.alibaba.fluss.connector.spark.table.SparkTable;
+import com.alibaba.fluss.connector.spark.utils.SparkConversions;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.utils.CatalogExceptionUtils;
+import com.alibaba.fluss.utils.ExceptionUtils;
+import com.alibaba.fluss.utils.IOUtils;
+import com.alibaba.fluss.utils.Preconditions;
+
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** A Spark Catalog for Fluss. */
+public class SparkCatalog
+ implements StagingTableCatalog,
+ SupportsNamespaces,
+ FunctionCatalog,
+ TableCatalog,
+ Closeable {
+
+ private static final String[] DEFAULT_NAMESPACE = new String[] {"fluss"};
+
+ private String bootstrapServers;
+ private String catalogName;
+ private Connection connection;
+ private Admin admin;
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.catalogName = name;
+ this.bootstrapServers = options.get(SparkConnectorOptions.BOOTSTRAP_SERVERS.key());
+ Configuration flussConfigs = new Configuration();
+ flussConfigs.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
+ connection = ConnectionFactory.createConnection(flussConfigs);
+ admin = connection.getAdmin();
+ }
+
+ @Override
+ public String[] defaultNamespace() {
+ return DEFAULT_NAMESPACE;
+ }
+
+ @Override
+ public String name() {
+ return this.catalogName;
+ }
+
+ @Override
+ public boolean namespaceExists(String[] namespace) {
+ isValidateNamespace(namespace);
+ try {
+ return admin.databaseExists(namespace[0]).get();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed to check if database %s exists in %s", namespace, name()),
+ e);
+ }
+ }
+
+ @Override
+ public String[][] listNamespaces() {
+ try {
+ List databases = admin.listDatabases().get();
+ String[][] namespaces = new String[databases.size()][];
+
+ for (int i = 0; i < databases.size(); ++i) {
+ namespaces[i] = new String[] {databases.get(i)};
+ }
+
+ return namespaces;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed to list all databases in %s", name()), e);
+ }
+ }
+
+ @Override
+ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+ if (namespace.length == 0) {
+ return listNamespaces();
+ } else {
+ isValidateNamespace(namespace);
+ if (namespaceExists(namespace)) {
+ return new String[0][];
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+ }
+
+ @Override
+ public Map loadNamespaceMetadata(String[] namespace)
+ throws NoSuchNamespaceException {
+ isValidateNamespace(namespace);
+ if (namespaceExists(namespace)) {
+ return Collections.emptyMap();
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ @Override
+ public void createNamespace(String[] namespace, Map metadata)
+ throws NamespaceAlreadyExistsException {
+ isValidateNamespace(namespace);
+ try {
+ admin.createDatabase(namespace[0], false).get();
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isDatabaseAlreadyExist(t)) {
+ throw new NamespaceAlreadyExistsException(namespace);
+ } else {
+ throw new CatalogException(
+ String.format("Failed to create database %s in %s", namespace, name()), t);
+ }
+ }
+ }
+
+ @Override
+ public void alterNamespace(String[] namespace, NamespaceChange... changes)
+ throws NoSuchNamespaceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException, NonEmptyNamespaceException {
+ isValidateNamespace(namespace);
+ try {
+ admin.deleteDatabase(namespace[0], false, cascade).get();
+ return true;
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
+ throw new NoSuchNamespaceException(namespace);
+ } else if (CatalogExceptionUtils.isDatabaseNotEmpty(t)) {
+ throw new NonEmptyNamespaceException(namespace);
+ } else {
+ throw new CatalogException(
+ String.format("Failed to drop database %s in %s", namespace, name()), t);
+ }
+ }
+ }
+
+ @Override
+ public StagedTable stageCreate(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StagedTable stageReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map properties)
+ throws NoSuchNamespaceException, NoSuchTableException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StagedTable stageCreateOrReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map properties)
+ throws NoSuchNamespaceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void invalidateTable(Identifier ident) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tableExists(Identifier ident) {
+ try {
+ return admin.tableExists(toTablePath(ident)).get();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed to check if table %s exists in %s", ident, name()),
+ ExceptionUtils.stripExecutionException(e));
+ }
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+ isValidateNamespace(namespace);
+ try {
+ List tables = admin.listTables(namespace[0]).get();
+ Identifier[] identifiers = new Identifier[tables.size()];
+ for (int i = 0; i < tables.size(); i++) {
+ identifiers[i] = Identifier.of(namespace, tables.get(i));
+ }
+ return identifiers;
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ throw new CatalogException(
+ String.format(
+ "Failed to list all tables in database %s in %s", namespace, name()),
+ t);
+ }
+ }
+
+ @Override
+ public Table loadTable(Identifier ident) throws NoSuchTableException {
+ try {
+ TableInfo tableInfo = admin.getTable(toTablePath(ident)).get();
+ return new SparkTable(
+ tableInfo.getTablePath(), tableInfo.getTableDescriptor(), bootstrapServers);
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isTableNotExist(t)) {
+ throw new NoSuchTableException(ident);
+ } else {
+ throw new CatalogException(
+ String.format("Failed to get table %s in %s", ident, name()), t);
+ }
+ }
+ }
+
+ @Override
+ public Table createTable(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ try {
+ TableDescriptor tableDescriptor =
+ SparkConversions.toFlussTable(schema, partitions, properties);
+ TablePath tablePath = toTablePath(ident);
+ admin.createTable(tablePath, tableDescriptor, false).get();
+ return new SparkTable(tablePath, tableDescriptor, bootstrapServers);
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
+ throw new NoSuchNamespaceException(ident.namespace());
+ } else if (CatalogExceptionUtils.isTableAlreadyExist(t)) {
+ throw new TableAlreadyExistsException(ident);
+ } else {
+ throw new CatalogException(
+ String.format("Failed to create table %s in %s", ident, name()), t);
+ }
+ }
+ }
+
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ try {
+ admin.deleteTable(toTablePath(ident), false).get();
+ return true;
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ throw new CatalogException(
+ String.format("Failed to drop table %s in %s", ident, name()), t);
+ }
+ }
+
+ @Override
+ public void renameTable(Identifier oldIdent, Identifier newIdent)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(admin, "fluss-admin");
+ IOUtils.closeQuietly(connection, "fluss-connection");
+ }
+
+ private void isValidateNamespace(String[] namespace) {
+ Preconditions.checkArgument(
+ namespace.length == 1, "Namespace %s is not valid", Arrays.toString(namespace));
+ }
+
+ private TablePath toTablePath(Identifier ident) {
+ isValidateNamespace(ident.namespace());
+ return TablePath.of(ident.namespace()[0], ident.name());
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/exception/CatalogException.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/exception/CatalogException.java
new file mode 100644
index 00000000..c30725d6
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/exception/CatalogException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.exception;
+
+/** The exception which was throw when spark catalog process failed. */
+public class CatalogException extends RuntimeException {
+
+ public CatalogException(String message) {
+ super(message);
+ }
+
+ public CatalogException(Throwable cause) {
+ super(cause);
+ }
+
+ public CatalogException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/table/SparkTable.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/table/SparkTable.java
new file mode 100644
index 00000000..c2de8bd9
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/table/SparkTable.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.table;
+
+import com.alibaba.fluss.connector.spark.utils.SparkConversions;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/** spark table. */
+public class SparkTable implements Table {
+
+ private final TablePath tablePath;
+ private final TableDescriptor tableDescriptor;
+ private final StructType sparkSchema;
+ private final Transform[] transforms;
+ private final Map properties;
+ private final String bootstrapServers;
+
+ public SparkTable(
+ TablePath tablePath, TableDescriptor tableDescriptor, String bootstrapServers) {
+ this.tablePath = tablePath;
+ this.tableDescriptor = tableDescriptor;
+ this.sparkSchema = SparkConversions.toSparkSchema(tableDescriptor.getSchema());
+ this.transforms = SparkConversions.toSparkTransforms(tableDescriptor.getPartitionKeys());
+ this.properties = new HashMap<>();
+ this.properties.putAll(tableDescriptor.getProperties());
+ this.properties.putAll(tableDescriptor.getCustomProperties());
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ @Override
+ public Transform[] partitioning() {
+ return transforms;
+ }
+
+ @Override
+ public Map properties() {
+ return properties;
+ }
+
+ @Override
+ public String name() {
+ return tablePath.getTableName();
+ }
+
+ @Override
+ public StructType schema() {
+ return sparkSchema;
+ }
+
+ @Override
+ public Set capabilities() {
+ return Collections.emptySet();
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/utils/FlussTypeToSparkType.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/utils/FlussTypeToSparkType.java
new file mode 100644
index 00000000..bb821949
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/utils/FlussTypeToSparkType.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.utils;
+
+import com.alibaba.fluss.types.ArrayType;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.BytesType;
+import com.alibaba.fluss.types.CharType;
+import com.alibaba.fluss.types.DataTypeVisitor;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.MapType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimeType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+
+/** Convert Fluss's {@link com.alibaba.fluss.types.DataType} to Spark's {@link DataType}. */
+public class FlussTypeToSparkType implements DataTypeVisitor {
+
+ static final FlussTypeToSparkType INSTANCE = new FlussTypeToSparkType();
+
+ @Override
+ public DataType visit(CharType charType) {
+ return new org.apache.spark.sql.types.CharType(charType.getLength());
+ }
+
+ @Override
+ public DataType visit(StringType stringType) {
+ return DataTypes.StringType;
+ }
+
+ @Override
+ public DataType visit(BooleanType booleanType) {
+ return DataTypes.BooleanType;
+ }
+
+ @Override
+ public DataType visit(BinaryType binaryType) {
+ return DataTypes.BinaryType;
+ }
+
+ @Override
+ public DataType visit(BytesType bytesType) {
+ return DataTypes.BinaryType;
+ }
+
+ @Override
+ public DataType visit(DecimalType decimalType) {
+ return DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale());
+ }
+
+ @Override
+ public DataType visit(TinyIntType tinyIntType) {
+ return DataTypes.ByteType;
+ }
+
+ @Override
+ public DataType visit(SmallIntType smallIntType) {
+ return DataTypes.ShortType;
+ }
+
+ @Override
+ public DataType visit(IntType intType) {
+ return DataTypes.IntegerType;
+ }
+
+ @Override
+ public DataType visit(BigIntType bigIntType) {
+ return DataTypes.LongType;
+ }
+
+ @Override
+ public DataType visit(FloatType floatType) {
+ return DataTypes.FloatType;
+ }
+
+ @Override
+ public DataType visit(DoubleType doubleType) {
+ return DataTypes.DoubleType;
+ }
+
+ @Override
+ public DataType visit(DateType dateType) {
+ return DataTypes.DateType;
+ }
+
+ @Override
+ public DataType visit(TimeType timeType) {
+ // spark 3.3 does not support Time type, use long to represent it
+ return DataTypes.LongType;
+ }
+
+ @Override
+ public DataType visit(TimestampType timestampType) {
+ // spark 3.3 does not support Timestamp without time zone type, use long to represent it
+ return DataTypes.LongType;
+ }
+
+ @Override
+ public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
+ // with local time zone (spark only support microsecond)
+ return DataTypes.TimestampType;
+ }
+
+ @Override
+ public DataType visit(ArrayType arrayType) {
+ // TODO: support ArrayType
+ throw new UnsupportedOperationException("UnSupport ArrayType now");
+ }
+
+ @Override
+ public DataType visit(MapType mapType) {
+ // TODO: support MapType
+ throw new UnsupportedOperationException("UnSupport MapType now");
+ }
+
+ @Override
+ public DataType visit(RowType rowType) {
+ // TODO: support RowType
+ throw new UnsupportedOperationException("UnSupport RowType now");
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/utils/SparkConversions.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/utils/SparkConversions.java
new file mode 100644
index 00000000..d3fd1084
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/main/java/com/alibaba/fluss/connector/spark/utils/SparkConversions.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.utils;
+
+import com.alibaba.fluss.config.ConfigOption;
+import com.alibaba.fluss.connector.spark.SparkConnectorOptions;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.BytesType;
+import com.alibaba.fluss.types.CharType;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TinyIntType;
+
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.connector.spark.SparkConnectorOptions.BUCKET_KEY;
+import static com.alibaba.fluss.connector.spark.SparkConnectorOptions.BUCKET_NUMBER;
+import static com.alibaba.fluss.connector.spark.SparkConnectorOptions.PRIMARY_KEY;
+
+/** Utils for conversion between Spark and Fluss. */
+public class SparkConversions {
+
+ /** Convert Spark's table to Fluss's table. */
+ public static TableDescriptor toFlussTable(
+ StructType sparkSchema, Transform[] partitions, Map properties) {
+ // schema
+ Schema.Builder schemBuilder = Schema.newBuilder();
+
+ if (properties.containsKey(PRIMARY_KEY.key())) {
+ List primaryKey =
+ Arrays.stream(properties.get(PRIMARY_KEY.key()).split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ schemBuilder.primaryKey(primaryKey);
+ }
+
+ Schema schema =
+ schemBuilder
+ .fromColumns(
+ Arrays.stream(sparkSchema.fields())
+ .map(
+ field ->
+ new Schema.Column(
+ field.name(),
+ SparkConversions.toFlussType(field),
+ field.getComment()
+ .getOrElse(() -> null)))
+ .collect(Collectors.toList()))
+ .build();
+
+ // partition keys
+ List partitionKeys =
+ Arrays.stream(partitions)
+ .map(partition -> partition.references()[0].describe())
+ .collect(Collectors.toList());
+
+ // bucket keys
+ List bucketKey;
+ if (properties.containsKey(BUCKET_KEY.key())) {
+ bucketKey =
+ Arrays.stream(properties.get(BUCKET_KEY.key()).split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ } else {
+ // use primary keys - partition keys
+ bucketKey =
+ schema.getPrimaryKey()
+ .map(
+ pk -> {
+ List bucketKeys =
+ new ArrayList<>(pk.getColumnNames());
+ bucketKeys.removeAll(partitionKeys);
+ return bucketKeys;
+ })
+ .orElse(Collections.emptyList());
+ }
+ Integer bucketNum = null;
+ if (properties.containsKey(BUCKET_NUMBER.key())) {
+ bucketNum = Integer.parseInt(properties.get(BUCKET_NUMBER.key()));
+ }
+
+ // process properties
+ Map flussTableProperties =
+ convertSparkOptionsToFlussTableProperties(properties);
+
+ // comment
+ String comment = properties.get("comment");
+
+ // TODO: process watermark
+ return TableDescriptor.builder()
+ .schema(schema)
+ .partitionedBy(partitionKeys)
+ .distributedBy(bucketNum, bucketKey)
+ .comment(comment)
+ .properties(flussTableProperties)
+ .customProperties(properties)
+ .build();
+ }
+
+ /** Convert Fluss's schema to Spark's schema. */
+ public static StructType toSparkSchema(Schema flussSchema) {
+ StructField[] fields = new StructField[flussSchema.getColumns().size()];
+ for (int i = 0; i < flussSchema.getColumns().size(); i++) {
+ fields[i] = toSparkStructField(flussSchema.getColumns().get(i));
+ }
+ return new StructType(fields);
+ }
+
+ /** Convert Fluss's partition keys to Spark's transforms. */
+ public static Transform[] toSparkTransforms(List partitionKeys) {
+ if (partitionKeys == null || partitionKeys.isEmpty()) {
+ return new Transform[0];
+ }
+ Transform[] transforms = new Transform[partitionKeys.size()];
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ transforms[i] = Expressions.identity(partitionKeys.get(i));
+ }
+ return transforms;
+ }
+
+ /** Convert Fluss's column to Spark's field. */
+ public static StructField toSparkStructField(Schema.Column flussColumn) {
+ StructField field =
+ new StructField(
+ flussColumn.getName(),
+ toSparkType(flussColumn.getDataType()),
+ flussColumn.getDataType().isNullable(),
+ Metadata.empty());
+ return flussColumn.getComment().isPresent()
+ ? field.withComment(flussColumn.getComment().get())
+ : field;
+ }
+
+ /** Convert Fluss's type to Spark's type. */
+ public static org.apache.spark.sql.types.DataType toSparkType(DataType flussDataType) {
+ return flussDataType.accept(FlussTypeToSparkType.INSTANCE);
+ }
+
+ /** Convert Spark's type to Fluss's type. */
+ public static DataType toFlussType(StructField sparkField) {
+ org.apache.spark.sql.types.DataType sparkType = sparkField.dataType();
+ boolean isNullable = sparkField.nullable();
+ if (sparkType instanceof org.apache.spark.sql.types.CharType) {
+ return new CharType(
+ isNullable, ((org.apache.spark.sql.types.CharType) sparkType).length());
+ } else if (sparkType instanceof org.apache.spark.sql.types.StringType) {
+ return new StringType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.BooleanType) {
+ return new BooleanType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.BinaryType) {
+ return new BytesType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.DecimalType) {
+ return new DecimalType(
+ isNullable,
+ ((org.apache.spark.sql.types.DecimalType) sparkType).precision(),
+ ((org.apache.spark.sql.types.DecimalType) sparkType).scale());
+ } else if (sparkType instanceof org.apache.spark.sql.types.ByteType) {
+ return new TinyIntType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.ShortType) {
+ return new SmallIntType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.IntegerType) {
+ return new IntType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.LongType) {
+ return new BigIntType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.FloatType) {
+ return new FloatType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.DoubleType) {
+ return new DoubleType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.DateType) {
+ return new DateType(isNullable);
+ } else if (sparkType instanceof org.apache.spark.sql.types.TimestampType) {
+ // spark only support 6 digits of precision
+ return new LocalZonedTimestampType(isNullable, 6);
+ } else {
+ // TODO: support more data type
+ throw new UnsupportedOperationException("Unsupported data type: " + sparkType);
+ }
+ }
+
+ private static Map convertSparkOptionsToFlussTableProperties(
+ Map options) {
+ Map properties = new HashMap<>();
+ for (ConfigOption> option : SparkConnectorOptions.TABLE_OPTIONS) {
+ if (options.containsKey(option.key())) {
+ properties.put(option.key(), options.get(option.key()));
+ }
+ }
+ return properties;
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalogITCase.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalogITCase.java
new file mode 100644
index 00000000..260cea7d
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalogITCase.java
@@ -0,0 +1,411 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.catalog;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.BytesType;
+import com.alibaba.fluss.types.CharType;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TinyIntType;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for {@link com.alibaba.fluss.connector.spark.catalog.SparkCatalog}. */
+public class SparkCatalogITCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkCatalogITCase.class);
+
+ private static final String DB = "my_db";
+ private static final String TABLE = "my_table";
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder().setNumOfTabletServers(1).build();
+
+ private static SparkSession spark;
+ private static Admin admin;
+
+ @BeforeAll
+ public static void beforeAll() {
+ Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ Map configs = getSparkConfigs(flussConf);
+ SparkConf sparkConf =
+ new SparkConf().setAppName("bss-spark-unit-tests").setMaster("local[*]");
+ configs.forEach(sparkConf::set);
+ spark = SparkSession.builder().config(sparkConf).getOrCreate();
+ spark.sparkContext().setLogLevel("WARN");
+
+ Connection connection = ConnectionFactory.createConnection(flussConf);
+ admin = connection.getAdmin();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ try {
+ spark.close();
+ admin.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ @AfterEach
+ public void afterEach() {
+ sql("DROP TABLE IF EXISTS fluss_catalog." + DB + "." + TABLE);
+ sql("DROP DATABASE IF EXISTS fluss_catalog." + DB);
+ }
+
+ @Test
+ public void createDatabaseTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ assertThatThrownBy(() -> sql("CREATE DATABASE fluss_catalog." + DB))
+ .isInstanceOf(NamespaceAlreadyExistsException.class)
+ .hasMessageContaining(
+ "[SCHEMA_ALREADY_EXISTS] Cannot create schema `my_db` because it already exists.");
+ sql("CREATE DATABASE IF NOT EXISTS fluss_catalog." + DB);
+ List databases =
+ sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream()
+ .map(row -> row.getString(0))
+ .collect(Collectors.toList());
+ assertThat(databases.size()).isEqualTo(2);
+ assertThat("fluss").isIn(databases);
+ assertThat(DB).isIn(databases);
+ }
+
+ @Test
+ public void dropDatabaseTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql("DROP DATABASE fluss_catalog." + DB);
+ assertThatThrownBy(() -> sql("DROP DATABASE fluss_catalog." + DB))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("[SCHEMA_NOT_FOUND] The schema `my_db` cannot be found.");
+ sql("DROP DATABASE IF EXISTS fluss_catalog." + DB);
+ List databases =
+ sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream()
+ .map(row -> row.getString(0))
+ .collect(Collectors.toList());
+ assertThat(databases.size()).isEqualTo(1);
+ assertThat(databases.get(0)).isEqualTo("fluss");
+ }
+
+ @Test
+ public void dropDatabaseWithCascadeTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql("CREATE TABLE fluss_catalog." + DB + "." + TABLE + " (id INT, name STRING)");
+ assertThatThrownBy(() -> sql("DROP DATABASE fluss_catalog." + DB))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "[SCHEMA_NOT_EMPTY] Cannot drop a schema `my_db` because it contains objects.");
+ List tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables.size()).isEqualTo(1);
+ assertThat(tables.get(0)).isEqualTo(TABLE);
+ List databases =
+ sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream()
+ .map(row -> row.getString(0))
+ .collect(Collectors.toList());
+ assertThat(databases.size()).isEqualTo(2);
+
+ sql("DROP DATABASE fluss_catalog." + DB + " CASCADE");
+ databases =
+ sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream()
+ .map(row -> row.getString(0))
+ .collect(Collectors.toList());
+ assertThat(databases.size()).isEqualTo(1);
+ assertThat(databases.get(0)).isEqualTo("fluss");
+ }
+
+ @Test
+ public void createTableTest() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE fluss_catalog."
+ + DB
+ + "."
+ + TABLE
+ + " (id INT, name STRING)"))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("[SCHEMA_NOT_FOUND] The schema `my_db` cannot be found.");
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql("CREATE TABLE fluss_catalog." + DB + "." + TABLE + " (id INT, name STRING)");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE fluss_catalog."
+ + DB
+ + "."
+ + TABLE
+ + " (id INT, name STRING)"))
+ .isInstanceOf(TableAlreadyExistsException.class)
+ .hasMessageContaining(
+ "[TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `my_db`.`my_table` because it already exists.");
+ sql(
+ "CREATE TABLE IF NOT EXISTS fluss_catalog."
+ + DB
+ + "."
+ + TABLE
+ + " (id INT, name STRING)");
+ List tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables.size()).isEqualTo(1);
+ assertThat(tables.get(0)).isEqualTo(TABLE);
+ }
+
+ @Test
+ public void dropTableTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql("CREATE TABLE fluss_catalog." + DB + "." + TABLE + " (id INT, name STRING)");
+ List tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables.size()).isEqualTo(1);
+ assertThat(tables.get(0)).isEqualTo(TABLE);
+ sql("DROP TABLE fluss_catalog." + DB + "." + TABLE);
+ assertThatThrownBy(() -> sql("DROP TABLE fluss_catalog." + DB + "." + TABLE))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "[TABLE_OR_VIEW_NOT_FOUND] The table or view `fluss_catalog`.`my_db`.`my_table` cannot be found.");
+ sql("DROP TABLE IF EXISTS fluss_catalog." + DB + "." + TABLE);
+ tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables).isEmpty();
+ }
+
+ @Test
+ public void fieldTypeTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql(
+ "CREATE TABLE fluss_catalog."
+ + DB
+ + "."
+ + TABLE
+ + " ("
+ + " int_field INT,"
+ + " short_field SHORT,"
+ + " byte_field BYTE,"
+ + " string_field STRING,"
+ + " boolean_field BOOLEAN,"
+ + " long_field LONG,"
+ + " float_field FLOAT,"
+ + " double_field DOUBLE,"
+ + " char_field CHAR(3),"
+ + " binary_field BINARY,"
+ + " date_field DATE,"
+ + " timestamp_field TIMESTAMP,"
+ + " decimal_field DECIMAL(10, 5)"
+ + ")");
+ List tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables.size()).isEqualTo(1);
+ assertThat(tables.get(0)).isEqualTo(TABLE);
+
+ // check spark datatype
+ sql("DESCRIBE TABLE fluss_catalog." + DB + "." + TABLE);
+
+ // check fluss datatype
+ TableInfo tableInfo = admin.getTable(TablePath.of(DB, TABLE)).join();
+ assertThat(tableInfo.getTableDescriptor().hasPrimaryKey()).isFalse();
+ assertThat(tableInfo.getTableDescriptor().getPartitionKeys()).isEmpty();
+ List columns = tableInfo.getTableDescriptor().getSchema().getColumns();
+ assertThat(columns.size()).isEqualTo(13);
+
+ assertThat(columns.get(0).getName()).isEqualTo("int_field");
+ assertThat(columns.get(0).getDataType()).isInstanceOf(IntType.class);
+ assertThat(columns.get(1).getName()).isEqualTo("short_field");
+ assertThat(columns.get(1).getDataType()).isInstanceOf(SmallIntType.class);
+ assertThat(columns.get(2).getName()).isEqualTo("byte_field");
+ assertThat(columns.get(2).getDataType()).isInstanceOf(TinyIntType.class);
+ assertThat(columns.get(3).getName()).isEqualTo("string_field");
+ assertThat(columns.get(3).getDataType()).isInstanceOf(StringType.class);
+ assertThat(columns.get(4).getName()).isEqualTo("boolean_field");
+ assertThat(columns.get(4).getDataType()).isInstanceOf(BooleanType.class);
+ assertThat(columns.get(5).getName()).isEqualTo("long_field");
+ assertThat(columns.get(5).getDataType()).isInstanceOf(BigIntType.class);
+ assertThat(columns.get(6).getName()).isEqualTo("float_field");
+ assertThat(columns.get(6).getDataType()).isInstanceOf(FloatType.class);
+ assertThat(columns.get(7).getName()).isEqualTo("double_field");
+ assertThat(columns.get(7).getDataType()).isInstanceOf(DoubleType.class);
+ assertThat(columns.get(8).getName()).isEqualTo("char_field");
+ assertThat(columns.get(8).getDataType()).isInstanceOf(CharType.class);
+ assertThat(((CharType) columns.get(8).getDataType()).getLength()).isEqualTo(3);
+ assertThat(columns.get(9).getName()).isEqualTo("binary_field");
+ assertThat(columns.get(9).getDataType()).isInstanceOf(BytesType.class);
+ assertThat(columns.get(10).getName()).isEqualTo("date_field");
+ assertThat(columns.get(10).getDataType()).isInstanceOf(DateType.class);
+ assertThat(columns.get(11).getName()).isEqualTo("timestamp_field");
+ assertThat(columns.get(11).getDataType()).isInstanceOf(LocalZonedTimestampType.class);
+ assertThat(((LocalZonedTimestampType) columns.get(11).getDataType()).getPrecision())
+ .isEqualTo(6);
+ assertThat(columns.get(12).getName()).isEqualTo("decimal_field");
+ assertThat(columns.get(12).getDataType()).isInstanceOf(DecimalType.class);
+ assertThat(((DecimalType) columns.get(12).getDataType()).getPrecision()).isEqualTo(10);
+ assertThat(((DecimalType) columns.get(12).getDataType()).getScale()).isEqualTo(5);
+ }
+
+ @Test
+ public void primaryKeyAndPartitionKeyTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql(
+ "CREATE TABLE fluss_catalog."
+ + DB
+ + "."
+ + TABLE
+ + " ("
+ + " int_field INT,"
+ + " short_field SHORT,"
+ + " byte_field BYTE,"
+ + " string_field STRING,"
+ + " boolean_field BOOLEAN,"
+ + " long_field LONG,"
+ + " float_field FLOAT,"
+ + " double_field DOUBLE,"
+ + " char_field CHAR(3),"
+ + " binary_field BINARY,"
+ + " date_field DATE,"
+ + " timestamp_field TIMESTAMP,"
+ + " decimal_field DECIMAL(10, 2)"
+ + ") PARTITIONED BY (string_field) OPTIONS ("
+ + " 'primary.key' = 'int_field, string_field',"
+ + " 'table.auto-partition.enabled' = 'true',"
+ + " 'table.auto-partition.time-unit' = 'HOUR'"
+ + ")");
+ List tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables.size()).isEqualTo(1);
+ assertThat(tables.get(0)).isEqualTo(TABLE);
+
+ sql("DESCRIBE TABLE fluss_catalog." + DB + "." + TABLE);
+
+ TableInfo tableInfo = admin.getTable(TablePath.of(DB, TABLE)).join();
+ // check primary key
+ assertThat(tableInfo.getTableDescriptor().hasPrimaryKey()).isTrue();
+ List primaryKey =
+ tableInfo.getTableDescriptor().getSchema().getPrimaryKey().get().getColumnNames();
+ assertThat(primaryKey.size()).isEqualTo(2);
+ assertThat(primaryKey.get(0)).isEqualTo("int_field");
+ assertThat(primaryKey.get(1)).isEqualTo("string_field");
+ // check partition key
+ List partitionKeys = tableInfo.getTableDescriptor().getPartitionKeys();
+ assertThat(partitionKeys.size()).isEqualTo(1);
+ assertThat(partitionKeys.get(0)).isEqualTo("string_field");
+ List columns = tableInfo.getTableDescriptor().getSchema().getColumns();
+ assertThat(columns.size()).isEqualTo(13);
+ }
+
+ @Test
+ public void commentTest() {
+ sql("CREATE DATABASE fluss_catalog." + DB);
+ sql(
+ "CREATE TABLE fluss_catalog."
+ + DB
+ + "."
+ + TABLE
+ + " ("
+ + " id INT COMMENT 'id comment test',"
+ + " first_name STRING COMMENT 'first name comment test',"
+ + " last_name STRING"
+ + ") COMMENT 'table comment test'");
+
+ List tables =
+ sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream()
+ .map(row -> row.getString(1))
+ .collect(Collectors.toList());
+ assertThat(tables.size()).isEqualTo(1);
+ assertThat(tables.get(0)).isEqualTo(TABLE);
+
+ List comments =
+ sql("DESCRIBE TABLE fluss_catalog." + DB + "." + TABLE).collectAsList().stream()
+ .map(row -> row.getString(2))
+ .collect(Collectors.toList());
+ assertThat(comments.size()).isEqualTo(3);
+ assertThat(comments.get(0)).isEqualTo("id comment test");
+ assertThat(comments.get(1)).isEqualTo("first name comment test");
+ assertThat(comments.get(2)).isNull();
+ }
+
+ private static Map getSparkConfigs(Configuration flussConf) {
+ Map configs = new HashMap<>();
+ configs.put("spark.sql.catalog.fluss_catalog", SparkCatalog.class.getName());
+ configs.put(
+ "spark.sql.catalog.fluss_catalog.bootstrap.servers",
+ String.join(",", flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)));
+ return configs;
+ }
+
+ public static Dataset sql(String sqlText) {
+ Dataset ds = spark.sql(sqlText);
+ if (ds.columns().length == 0) {
+ LOG.info("+----------------+");
+ LOG.info("| Empty Result |");
+ LOG.info("+----------------+");
+ } else {
+ ds.show(20, 50);
+ }
+ return ds;
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/utils/SparkConversionsTest.java b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/utils/SparkConversionsTest.java
new file mode 100644
index 00000000..0f01ca33
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/utils/SparkConversionsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2024 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.connector.spark.utils;
+
+import com.alibaba.fluss.connector.spark.SparkConnectorOptions;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SparkConversions}. */
+public class SparkConversionsTest {
+
+ @Test
+ void testTypeConversion() {
+ // fluss columns
+ List flussColumns =
+ Arrays.asList(
+ new Schema.Column("a", DataTypes.BOOLEAN().copy(false), null),
+ new Schema.Column("b", DataTypes.TINYINT().copy(false), null),
+ new Schema.Column("c", DataTypes.SMALLINT(), "comment1"),
+ new Schema.Column("d", DataTypes.INT(), "comment2"),
+ new Schema.Column("e", DataTypes.BIGINT(), null),
+ new Schema.Column("f", DataTypes.FLOAT(), null),
+ new Schema.Column("g", DataTypes.DOUBLE(), null),
+ new Schema.Column("h", DataTypes.CHAR(1), null),
+ new Schema.Column("i", DataTypes.STRING(), null),
+ new Schema.Column("j", DataTypes.DECIMAL(10, 2), null),
+ new Schema.Column("k", DataTypes.BYTES(), null),
+ new Schema.Column("l", DataTypes.DATE(), null),
+ new Schema.Column("m", DataTypes.TIMESTAMP_LTZ(6), null));
+
+ // spark columns
+ List sparkColumns =
+ Arrays.asList(
+ new StructField(
+ "a",
+ org.apache.spark.sql.types.DataTypes.BooleanType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ "b",
+ org.apache.spark.sql.types.DataTypes.ByteType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ "c",
+ org.apache.spark.sql.types.DataTypes.ShortType,
+ true,
+ Metadata.empty())
+ .withComment("comment1"),
+ new StructField(
+ "d",
+ org.apache.spark.sql.types.DataTypes.IntegerType,
+ true,
+ Metadata.empty())
+ .withComment("comment2"),
+ new StructField(
+ "e",
+ org.apache.spark.sql.types.DataTypes.LongType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "f",
+ org.apache.spark.sql.types.DataTypes.FloatType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "g",
+ org.apache.spark.sql.types.DataTypes.DoubleType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "h",
+ new org.apache.spark.sql.types.CharType(1),
+ true,
+ Metadata.empty()),
+ new StructField(
+ "i",
+ org.apache.spark.sql.types.DataTypes.StringType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "j",
+ org.apache.spark.sql.types.DataTypes.createDecimalType(10, 2),
+ true,
+ Metadata.empty()),
+ new StructField(
+ "k",
+ org.apache.spark.sql.types.DataTypes.BinaryType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "l",
+ org.apache.spark.sql.types.DataTypes.DateType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "m",
+ org.apache.spark.sql.types.DataTypes.TimestampType,
+ true,
+ Metadata.empty()));
+
+ // test from fluss columns to spark columns
+ List actualSparkColumns = new ArrayList<>();
+ for (Schema.Column flussColumn : flussColumns) {
+ actualSparkColumns.add(SparkConversions.toSparkStructField(flussColumn));
+ }
+ assertThat(actualSparkColumns).isEqualTo(sparkColumns);
+
+ // test from spark columns to fluss columns
+ List actualFlussColumns = new ArrayList<>();
+ for (StructField sparkColumn : sparkColumns) {
+ actualFlussColumns.add(
+ new Schema.Column(
+ sparkColumn.name(),
+ SparkConversions.toFlussType(sparkColumn),
+ sparkColumn.getComment().getOrElse(() -> null)));
+ }
+ assertThat(actualFlussColumns).isEqualTo(flussColumns);
+
+ // test TIME and TIMESTAMP type
+ assertThat(SparkConversions.toSparkType(DataTypes.TIME()))
+ .isEqualTo(org.apache.spark.sql.types.DataTypes.LongType);
+ assertThat(SparkConversions.toSparkType(DataTypes.TIMESTAMP()))
+ .isEqualTo(org.apache.spark.sql.types.DataTypes.LongType);
+ }
+
+ @Test
+ void testTableConversion() {
+ StructField[] sparkColumns =
+ new StructField[] {
+ new StructField(
+ "order_id",
+ org.apache.spark.sql.types.DataTypes.LongType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ "order_name",
+ org.apache.spark.sql.types.DataTypes.StringType,
+ true,
+ Metadata.empty())
+ };
+
+ // test convert spark table to fluss table
+ StructType structType = new StructType(sparkColumns);
+ Transform[] transforms = new Transform[0];
+ Map properties = new HashMap<>();
+ properties.put(SparkConnectorOptions.PRIMARY_KEY.key(), "order_id");
+ properties.put("comment", "test comment");
+ properties.put("k1", "v1");
+ properties.put("k2", "v2");
+ TableDescriptor flussTable =
+ SparkConversions.toFlussTable(structType, transforms, properties);
+
+ String expectFlussTableString =
+ "TableDescriptor{schema=("
+ + "order_id BIGINT NOT NULL,"
+ + "order_name STRING,"
+ + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)"
+ + "), comment='test comment', partitionKeys=[], "
+ + "tableDistribution={bucketKeys=[order_id] bucketCount=null}, "
+ + "properties={}, "
+ + "customProperties={comment=test comment, primary.key=order_id, k1=v1, k2=v2}"
+ + "}";
+ assertThat(flussTable.toString()).isEqualTo(expectFlussTableString);
+
+ // test convert fluss table to spark table
+ StructType convertedSparkSchema = SparkConversions.toSparkSchema(flussTable.getSchema());
+ Transform[] convertedTransforms =
+ SparkConversions.toSparkTransforms(flussTable.getPartitionKeys());
+ assertThat(convertedSparkSchema.fields()).isEqualTo(sparkColumns);
+ assertThat(convertedTransforms).isEqualTo(transforms);
+ }
+}
diff --git a/fluss-connectors/fluss-connector-spark/pom.xml b/fluss-connectors/fluss-connector-spark/pom.xml
new file mode 100644
index 00000000..6c606174
--- /dev/null
+++ b/fluss-connectors/fluss-connector-spark/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+
+ 4.0.0
+
+ com.alibaba.fluss
+ fluss-connectors
+ 0.6-SNAPSHOT
+
+
+ fluss-connector-spark
+ pom
+
+ Fluss : Connector : Spark
+
+
+ 2.12
+
+
+
+ fluss-connector-spark-common
+ fluss-connector-spark-3.3
+
+
\ No newline at end of file
diff --git a/fluss-connectors/pom.xml b/fluss-connectors/pom.xml
index f732ddbc..5e87dc49 100644
--- a/fluss-connectors/pom.xml
+++ b/fluss-connectors/pom.xml
@@ -31,6 +31,7 @@
fluss-connector-flink
+ fluss-connector-spark