diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 2dce054716..5788e1d80a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.lakestorage; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.metadata.TableChange; @@ -25,6 +26,7 @@ import org.apache.fluss.metadata.TablePath; import java.util.List; +import java.util.Map; /** * A catalog interface to modify metadata in external datalake. @@ -34,6 +36,14 @@ @PublicEvolving public interface LakeCatalog extends AutoCloseable { + /** + * Get a table in lake. + * + * @param tablePath path of the table to be created + * @throws TableNotExistException if the table not exists + */ + TableDescriptor getTable(TablePath tablePath) throws TableNotExistException; + /** * Create a new table in lake. * @@ -54,6 +64,26 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor) void alterTable(TablePath tablePath, List tableChanges) throws TableNotExistException; + /** + * Check whether the properties of the existing lake table are compatible with the properties of + * the table want to be created. + * + * @param properties The properties to be validated + */ + default void validateTablePropertyCompatibility( + Map existingProperties, Map properties) { + if (!existingProperties.equals(properties)) { + throw new InvalidConfigException( + "The properties of the existing lake table are not compatible with " + + "the properties of the table want to be created. " + + "Properties of existing lake table: " + + existingProperties + + ", " + + "properties of lake table to be created: " + + properties); + } + } + @Override default void close() throws Exception { // default do nothing diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java index 9c75d36094..eb67e86233 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java @@ -29,6 +29,7 @@ import org.apache.fluss.utils.WrappingProxy; import java.util.List; +import java.util.Map; /** * A wrapper around {@link LakeStoragePlugin} that ensures the plugin classloader is used for all @@ -74,6 +75,13 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader this.loader = loader; } + @Override + public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + return inner.getTable(tablePath); + } + } + @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) throws TableAlreadyExistException { @@ -90,6 +98,14 @@ public void alterTable(TablePath tablePath, List tableChanges) } } + @Override + public void validateTablePropertyCompatibility( + Map existingProperties, Map properties) { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + inner.validateTablePropertyCompatibility(existingProperties, properties); + } + } + @Override public void close() throws Exception { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataType.java b/fluss-common/src/main/java/org/apache/fluss/types/DataType.java index 91bacd308f..1dc2919898 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataType.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataType.java @@ -139,6 +139,14 @@ public String asSummaryString() { public abstract R accept(DataTypeVisitor visitor); + public DataType notNull() { + return copy(false); + } + + public DataType nullable() { + return copy(true); + } + @Override public String toString() { return asSummaryString(); diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java index 5812cc3cad..7a4f2cec6f 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java @@ -145,6 +145,11 @@ public LakeSource createLakeSource(TablePath tablePath) { private static class TestPaimonLakeCatalog implements LakeCatalog { + @Override + public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException { + return null; + } + @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) throws TableAlreadyExistException {} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 84a85deade..ee98f54184 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -308,13 +308,8 @@ void testCreateAlreadyExistsLakeTable() throws Exception { assertThat(catalog.tableExists(lakeTablePath)).isTrue(); // drop fluss table catalog.dropTable(lakeTablePath, false); - // create the table again, should throw exception with ignore if exist = false - assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table, false)) - .isInstanceOf(CatalogException.class) - .hasMessage( - String.format( - "The table %s already exists in %s catalog, please first drop the table in %s catalog or use a new table name.", - lakeTablePath, "paimon", "paimon")); + // create the table again, should be ok if lake table schema match the fluss table schema + catalog.createTable(lakeTablePath, table, false); } @Test diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java index 3b1e4bd0d3..69ed5b9c4a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java @@ -83,6 +83,12 @@ protected Catalog getIcebergCatalog() { return icebergCatalog; } + @Override + public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException { + throw new UnsupportedOperationException( + "Get table is not supported for Iceberg at the moment"); + } + @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) throws TableAlreadyExistException { diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java index 2a55fc46ab..2985210b32 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java @@ -42,6 +42,12 @@ public LanceLakeCatalog(Configuration config) { this.options = config; } + @Override + public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException { + throw new UnsupportedOperationException( + "Get table is not supported for Lance at the moment"); + } + @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) { // currently, we don't support primary key table for lance diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonDataTypeToFlussDataType.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonDataTypeToFlussDataType.java new file mode 100644 index 0000000000..0e99c40bf7 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonDataTypeToFlussDataType.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon; + +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; + +/** Convert from Paimon's data type to Fluss's data type. */ +public class PaimonDataTypeToFlussDataType implements DataTypeVisitor { + + public static final PaimonDataTypeToFlussDataType INSTANCE = + new PaimonDataTypeToFlussDataType(); + + @Override + public DataType visit(CharType charType) { + return withNullability(DataTypes.CHAR(charType.getLength()), charType.isNullable()); + } + + @Override + public DataType visit(VarCharType varCharType) { + return withNullability(DataTypes.STRING(), varCharType.isNullable()); + } + + @Override + public DataType visit(BooleanType booleanType) { + return withNullability(DataTypes.BOOLEAN(), booleanType.isNullable()); + } + + @Override + public DataType visit(BinaryType binaryType) { + return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType.isNullable()); + } + + @Override + public DataType visit(VarBinaryType varBinaryType) { + return withNullability(DataTypes.BYTES(), varBinaryType.isNullable()); + } + + @Override + public DataType visit(DecimalType decimalType) { + return withNullability( + DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()), + decimalType.isNullable()); + } + + @Override + public DataType visit(TinyIntType tinyIntType) { + return withNullability(DataTypes.TINYINT(), tinyIntType.isNullable()); + } + + @Override + public DataType visit(SmallIntType smallIntType) { + return withNullability(DataTypes.SMALLINT(), smallIntType.isNullable()); + } + + @Override + public DataType visit(IntType intType) { + return withNullability(DataTypes.INT(), intType.isNullable()); + } + + @Override + public DataType visit(BigIntType bigIntType) { + return withNullability(DataTypes.BIGINT(), bigIntType.isNullable()); + } + + @Override + public DataType visit(FloatType floatType) { + return withNullability(DataTypes.FLOAT(), floatType.isNullable()); + } + + @Override + public DataType visit(DoubleType doubleType) { + return withNullability(DataTypes.DOUBLE(), doubleType.isNullable()); + } + + @Override + public DataType visit(DateType dateType) { + return withNullability(DataTypes.DATE(), dateType.isNullable()); + } + + @Override + public DataType visit(TimeType timeType) { + return withNullability(DataTypes.TIME(), timeType.isNullable()); + } + + @Override + public DataType visit(TimestampType timestampType) { + return withNullability( + DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType.isNullable()); + } + + @Override + public DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return withNullability( + DataTypes.TIMESTAMP_LTZ(localZonedTimestampType.getPrecision()), + localZonedTimestampType.isNullable()); + } + + @Override + public DataType visit(VariantType variantType) { + throw new UnsupportedOperationException("VariantType is not supported for Fluss."); + } + + @Override + public DataType visit(ArrayType arrayType) { + return withNullability( + DataTypes.ARRAY(arrayType.getElementType().accept(this)), arrayType.isNullable()); + } + + @Override + public DataType visit(MultisetType multisetType) { + throw new UnsupportedOperationException("MultisetType is not supported for Fluss."); + } + + @Override + public DataType visit(MapType mapType) { + return withNullability( + DataTypes.MAP( + mapType.getKeyType().accept(this), mapType.getValueType().accept(this)), + mapType.isNullable()); + } + + @Override + public DataType visit(RowType rowType) { + org.apache.fluss.types.RowType.Builder rowTypeBuilder = + org.apache.fluss.types.RowType.builder(); + for (DataField field : rowType.getFields()) { + rowTypeBuilder.field(field.name(), field.type().accept(this), field.description()); + } + return withNullability(rowTypeBuilder.build(), rowType.isNullable()); + } + + private DataType withNullability(DataType fluss, boolean nullable) { + if (fluss.isNullable() != nullable) { + return nullable ? fluss.nullable() : fluss.notNull(); + } + return fluss; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index b11d5adf2f..ffbd875157 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -34,15 +34,19 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussTableDescriptor; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.validatePaimonTableOptions; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -76,6 +80,17 @@ protected Catalog getPaimonCatalog() { return paimonCatalog; } + @Override + public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException { + try { + Identifier paimonPath = toPaimon(tablePath); + Table table = paimonCatalog.getTable(paimonPath); + return toFlussTableDescriptor(table); + } catch (Catalog.TableNotExistException e) { + throw new TableNotExistException("Table " + tablePath + " not exists."); + } + } + @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) throws TableAlreadyExistException { @@ -114,6 +129,12 @@ public void alterTable(TablePath tablePath, List tableChanges) } } + @Override + public void validateTablePropertyCompatibility( + Map existingProperties, Map properties) { + validatePaimonTableOptions(existingProperties, properties); + } + private void createTable(Identifier tablePath, Schema schema) throws Catalog.DatabaseNotExistException { try { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 2c42c471ee..97678f1dbf 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -18,9 +18,11 @@ package org.apache.fluss.lake.paimon.utils; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.FlussConfigUtils; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType; +import org.apache.fluss.lake.paimon.PaimonDataTypeToFlussDataType; import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; @@ -34,11 +36,16 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -60,6 +67,7 @@ public class PaimonConversions { static { PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key()); PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key()); + PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key()); PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PARTITION_GENERATE_LEGCY_NAME.key()); } @@ -199,6 +207,99 @@ public static Schema toPaimonSchema(TableDescriptor tableDescriptor) { return schemaBuilder.build(); } + public static TableDescriptor toFlussTableDescriptor(Table table) { + FileStoreTable paimonTable = (FileStoreTable) table; + Map options = table.options(); + TableSchema paimonSchema = paimonTable.schema(); + + TableDescriptor.Builder builder = TableDescriptor.builder(); + + // extract bucket num and bucket keys + int paimonBuckets = paimonSchema.numBuckets(); + if (paimonBuckets != -1) { + builder.distributedBy(paimonBuckets, paimonSchema.bucketKeys()); + } + // if paimonBuckets == -1, we keep TableDistribution as null here. + // Because when create tables by Java API, we don't know the bucket num + + // build schema + org.apache.fluss.metadata.Schema.Builder schemaBuilder = + org.apache.fluss.metadata.Schema.newBuilder(); + for (DataField field : paimonSchema.fields()) { + if (SYSTEM_COLUMNS.containsKey(field.name())) { + continue; + } + schemaBuilder + .column( + field.name(), + field.type().accept(PaimonDataTypeToFlussDataType.INSTANCE)) + .withComment(field.description()); + } + + // set pk + if (!paimonSchema.primaryKeys().isEmpty()) { + schemaBuilder.primaryKey(paimonSchema.primaryKeys()); + } + builder.schema(schemaBuilder.build()); + + // set partition keys + builder.partitionedBy(paimonSchema.partitionKeys()); + + // set properties to fluss table descriptor + Map properties = new HashMap<>(); + Map customProperties = new HashMap<>(); + options.forEach( + (k, v) -> { + if (!PAIMON_UNSETTABLE_OPTIONS.contains(k)) { + String flussKey = convertPaimonPropertyKeyToFluss(k); + if (FlussConfigUtils.isTableStorageConfig(flussKey)) { + properties.put(flussKey, v); + } + customProperties.put(flussKey, v); + } + }); + builder.properties(properties); + builder.customProperties(customProperties); + + return builder.build(); + } + + public static void validatePaimonTableOptions( + Map existingOptions, Map options) { + // check new options + Map newOptions = new HashMap<>(options); + existingOptions.forEach( + (k, v) -> { + if (v.equals(options.get(k))) { + newOptions.remove(k); + } + }); + // currently, we don't support update options + if (!newOptions.isEmpty()) { + throw new InvalidConfigException( + "The options of the existing Paimon table are not compatible with the new one. " + + "New options: " + + newOptions); + } + + // check remove options + Map removeOptions = new HashMap<>(existingOptions); + options.forEach((k, v) -> removeOptions.remove(k)); + String changelogProducerKey = + convertPaimonPropertyKeyToFluss(CoreOptions.CHANGELOG_PRODUCER.key()); + if (CoreOptions.ChangelogProducer.INPUT + .toString() + .equalsIgnoreCase(removeOptions.get(changelogProducerKey))) { + removeOptions.remove(changelogProducerKey); + } + if (!removeOptions.isEmpty()) { + throw new InvalidConfigException( + "The options of the existing Paimon table are not compatible with the new one. " + + "Remove options: " + + removeOptions); + } + } + private static void validatePaimonOptions(Map properties) { properties.forEach( (k, v) -> { @@ -237,4 +338,12 @@ private static String convertFlussPropertyKeyToPaimon(String key) { return FLUSS_CONF_PREFIX + key; } } + + private static String convertPaimonPropertyKeyToFluss(String key) { + if (key.startsWith(FLUSS_CONF_PREFIX)) { + return key.substring(FLUSS_CONF_PREFIX.length()); + } else { + return PAIMON_CONF_PREFIX + key; + } + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 37033ea74f..baa545a4ca 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -401,6 +401,200 @@ void testCreateLakeEnableTableWithUnsettablePaimonOptions() { } } + @Test + void testCreateLakeEnableTableWithExistLakeTable() throws Exception { + Map customProperties = new HashMap<>(); + customProperties.put("k1", "v1"); + customProperties.put("paimon.file.format", "parquet"); + + // create log table without bucket keys + TableDescriptor logTableWithoutBucketKeys = + createTableDescriptor( + 2, + BUCKET_NUM, + Collections.emptyList(), + Collections.emptyList(), + customProperties, + false); + TablePath logTablePathWithoutBucketkeys = + TablePath.of(DATABASE, "log_table_without_bucket_keys"); + admin.createTable(logTablePathWithoutBucketkeys, logTableWithoutBucketKeys, false).get(); + // drop fluss table, lake table should still exist + admin.dropTable(logTablePathWithoutBucketkeys, false).get(); + // create the same fluss table again should be ok + admin.createTable(logTablePathWithoutBucketkeys, logTableWithoutBucketKeys, false).get(); + admin.dropTable(logTablePathWithoutBucketkeys, false).get(); + + // paimon table use dynamic bucket for fluss log table without bucket keys + // so it should be ok to create the same fluss table with a new bucket num + logTableWithoutBucketKeys = logTableWithoutBucketKeys.withBucketCount(BUCKET_NUM + 1); + admin.createTable(logTablePathWithoutBucketkeys, logTableWithoutBucketKeys, false).get(); + admin.dropTable(logTablePathWithoutBucketkeys, false).get(); + + // create log table with bucket keys will throw exception + TableDescriptor logTableWithoutBucketKeys1 = + createTableDescriptor( + 2, + BUCKET_NUM, + Arrays.asList("c1", "c2"), + Collections.emptyList(), + customProperties, + false); + assertThatThrownBy( + () -> + admin.createTable( + logTablePathWithoutBucketkeys, + logTableWithoutBucketKeys1, + false) + .get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.log_table_without_bucket_keys already exists in paimon catalog, but the bucket keys doesn't match, new table bucket keys [c1, c2], exist lake table doesn't have bucket keys. Please first drop the table in paimon catalog or use a new table name."); + + // create log table with bucket keys will throw exception + TableDescriptor logTableWithoutBucketKeys2 = + createTableDescriptor( + 3, + BUCKET_NUM, + Collections.emptyList(), + Collections.emptyList(), + customProperties, + false); + assertThatThrownBy( + () -> + admin.createTable( + logTablePathWithoutBucketkeys, + logTableWithoutBucketKeys2, + false) + .get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.log_table_without_bucket_keys already exists in paimon catalog, but the schema doesn't match, new table schema (c1 STRING,c2 INT,c3 STRING), exist lake table schema (c1 STRING,c2 INT). Please first drop the table in paimon catalog or use a new table name."); + + // create table with primary keys will throw exception + TableDescriptor logTableWithoutBucketKeys3 = + createTableDescriptor( + 2, + BUCKET_NUM, + Collections.emptyList(), + Collections.emptyList(), + customProperties, + true); + assertThatThrownBy( + () -> + admin.createTable( + logTablePathWithoutBucketkeys, + logTableWithoutBucketKeys3, + false) + .get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.log_table_without_bucket_keys already exists in paimon catalog, but the bucket keys doesn't match, new table bucket keys [c1], exist lake table doesn't have bucket keys. Please first drop the table in paimon catalog or use a new table name."); + + // create table with partition keys will throw exception + TableDescriptor logTableWithoutBucketKeys4 = + createTableDescriptor( + 2, + BUCKET_NUM, + Collections.emptyList(), + Collections.singletonList("c1"), + customProperties, + false); + assertThatThrownBy( + () -> + admin.createTable( + logTablePathWithoutBucketkeys, + logTableWithoutBucketKeys4, + false) + .get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.log_table_without_bucket_keys already exists in paimon catalog, but the partition keys doesn't match, new table partition keys [c1], exist lake table partition keys []. Please first drop the table in paimon catalog or use a new table name."); + + // create table with different custom properties will throw exception + customProperties.put("paimon.file.format", "orc"); + TableDescriptor logTableWithoutBucketKeys5 = + createTableDescriptor( + 2, + BUCKET_NUM, + Collections.emptyList(), + Collections.emptyList(), + customProperties, + false); + assertThatThrownBy( + () -> + admin.createTable( + logTablePathWithoutBucketkeys, + logTableWithoutBucketKeys5, + false) + .get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessage( + "The options of the existing Paimon table are not compatible with the new one. New options: {paimon.file.format=orc}"); + + // create log table with bucket keys + TableDescriptor logTableWithBucketKeys = + createTableDescriptor( + 2, + BUCKET_NUM, + Collections.singletonList("c1"), + Collections.emptyList(), + customProperties, + false); + TablePath logTablePathWithBucketKeys = TablePath.of(DATABASE, "log_table_with_bucket_keys"); + admin.createTable(logTablePathWithBucketKeys, logTableWithBucketKeys, false).get(); + // drop fluss table, lake table should still exist + admin.dropTable(logTablePathWithBucketKeys, false).get(); + // create the same fluss table again should be ok + admin.createTable(logTablePathWithBucketKeys, logTableWithBucketKeys, false).get(); + admin.dropTable(logTablePathWithBucketKeys, false).get(); + + // fluss table with bucket keys will use static bucket in paimon + TableDescriptor logTableWithBucketKeys1 = + logTableWithBucketKeys.withBucketCount(BUCKET_NUM + 1); + assertThatThrownBy( + () -> + admin.createTable( + logTablePathWithBucketKeys, + logTableWithBucketKeys1, + false) + .get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.log_table_with_bucket_keys already exists in paimon catalog, but the distribution doesn't match, new table distribution {bucketKeys=[c1] bucketCount=4}, exist lake table distribution {bucketKeys=[c1] bucketCount=3}. Please first drop the table in paimon catalog or use a new table name."); + + // create pk table + TableDescriptor pkTable = + createTableDescriptor( + 2, + BUCKET_NUM, + Collections.emptyList(), + Collections.emptyList(), + customProperties, + true); + TablePath pkTablePath = TablePath.of(DATABASE, "pk_table_for_exist_lake_table"); + admin.createTable(pkTablePath, pkTable, false).get(); + // drop fluss table, lake table should still exist + admin.dropTable(pkTablePath, false).get(); + // create the same fluss table again should be ok + admin.createTable(pkTablePath, pkTable, false).get(); + admin.dropTable(pkTablePath, false).get(); + + // fluss table with bucket keys will use static bucket in paimon + TableDescriptor pkTable1 = pkTable.withBucketCount(BUCKET_NUM + 1); + assertThatThrownBy(() -> admin.createTable(pkTablePath, pkTable1, false).get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.pk_table_for_exist_lake_table already exists in paimon catalog, but the distribution doesn't match, new table distribution {bucketKeys=[c1] bucketCount=4}, exist lake table distribution {bucketKeys=[c1] bucketCount=3}. Please first drop the table in paimon catalog or use a new table name."); + } + @Test void testAlterLakeEnabledLogTable() throws Exception { Map customProperties = new HashMap<>(); @@ -475,15 +669,7 @@ void testAlterLakeEnabledLogTable() throws Exception { // try to enable lake table again enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); List finalChanges = Collections.singletonList(enableLake); - // TODO: After #846 is implemented, we should remove this exception assertion. - assertThatThrownBy(() -> admin.alterTable(logTablePath, finalChanges, false).get()) - .cause() - .isInstanceOf(LakeTableAlreadyExistException.class) - .hasMessage( - String.format( - "The table %s already exists in paimon catalog, please " - + "first drop the table in paimon catalog or use a new table name.", - logTablePath)); + admin.alterTable(logTablePath, finalChanges, false).get(); } @Test @@ -610,6 +796,94 @@ void testAlterLakeEnabledTableProperties() throws Exception { admin.alterTable(TablePath.of(DATABASE, "not_exist_table"), tableChanges, true).get(); } + @Test + void testEnableLakeTableAfterAlterTableProperties() throws Exception { + Map customProperties = new HashMap<>(); + customProperties.put("k1", "v1"); + customProperties.put("paimon.file.format", "parquet"); + + // create table + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false) + .customProperties(customProperties) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + TablePath tablePath = TablePath.of(DATABASE, "enable_lake_table_after_alter_properties"); + admin.createTable(tablePath, tableDescriptor, false).get(); + // paimon table should not exist because lake table is disable + assertThatThrownBy( + () -> + paimonCatalog.getTable( + Identifier.create(DATABASE, tablePath.getTableName()))) + .isInstanceOf(Catalog.TableNotExistException.class) + .hasMessage(String.format("Table %s does not exist.", tablePath)); + + // alter fluss tale properties + List tableChanges = + Arrays.asList(TableChange.reset("k1"), TableChange.set("k2", "v2")); + admin.alterTable(tablePath, tableChanges, false).get(); + // enable lake table should be ok + TableChange.SetOption enableLake = + TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + admin.alterTable(tablePath, Collections.singletonList(enableLake), false).get(); + Table paimonTable = + paimonCatalog.getTable(Identifier.create(DATABASE, tablePath.getTableName())); + customProperties.remove("k1"); + customProperties.put("k2", "v2"); + Map newProperties = new HashMap<>(tableDescriptor.getProperties()); + newProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + tableDescriptor = tableDescriptor.withProperties(newProperties, customProperties); + verifyPaimonTable( + paimonTable, + tableDescriptor, + RowType.of( + new DataType[] { + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.STRING(), + // for __bucket, __offset, __timestamp + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.BIGINT(), + org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE() + }, + new String[] { + "c1", + "c2", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME + }), + "c1,c2", + BUCKET_NUM); + + // disable lake table + TableChange.SetOption diableLake = + TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"); + admin.alterTable(tablePath, Collections.singletonList(diableLake), false).get(); + + // alter fluss tale properties when lake table is disabled + tableChanges = Collections.singletonList(TableChange.set("k2", "v22")); + admin.alterTable(tablePath, tableChanges, false).get(); + + // enable lake table will throw exception, because we altered the fluss table properties + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList(enableLake), + false) + .get()) + .cause() + .isInstanceOf(LakeTableAlreadyExistException.class) + .hasMessage( + "The table fluss.enable_lake_table_after_alter_properties already exists in paimon catalog, but the custom properties doesn't match, new table custom properties {paimon.file.format=parquet, k2=v22}, exist lake table custom properties {paimon.file.format=parquet, k2=v2}. Please first drop the table in paimon catalog or use a new table name."); + } + private void verifyPaimonTable( Table paimonTable, TableDescriptor flussTable, @@ -657,4 +931,32 @@ private void verifyPaimonTable( RowType paimonRowType = paimonTable.rowType(); assertThat(paimonRowType).isEqualTo(expectedRowType); } + + private TableDescriptor createTableDescriptor( + int columnNum, + int bucketNum, + List bucketKeys, + List partitionKeys, + Map customProperties, + boolean withPrimaryKeys) { + Schema.Builder builder = Schema.newBuilder(); + for (int i = 1; i <= columnNum; i++) { + if (i % 2 == 0) { + builder.column("c" + i, DataTypes.INT()); + } else { + builder.column("c" + i, DataTypes.STRING()); + } + } + if (withPrimaryKeys) { + builder.primaryKey("c1"); + } + + return TableDescriptor.builder() + .schema(builder.build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .distributedBy(bucketNum, bucketKeys) + .partitionedBy(partitionKeys) + .build(); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index c2275fe21e..312e695cd7 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.lake.paimon; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; @@ -32,6 +33,8 @@ import java.io.File; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -49,6 +52,67 @@ public void setUp() { flussPaimonCatalog = new PaimonLakeCatalog(configuration); } + @Test + void testGetTable() { + Map customProperties = new HashMap<>(); + customProperties.put("k1", "v1"); + customProperties.put("paimon.file.format", "parquet"); + // customProperties.put("paimon.changelog-producer", "NONE"); + + // test bucket key log table + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("log_c1", DataTypes.INT()) + .column("log_c2", DataTypes.STRING()) + .primaryKey("log_c1", "log_c2") + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .distributedBy(3, "log_c1", "log_c2") + .build(); + TablePath logTablePath = TablePath.of("fluss", "log_table"); + flussPaimonCatalog.createTable(logTablePath, tableDescriptor); + + TableDescriptor paimonDescriptor = flussPaimonCatalog.getTable(logTablePath); + + verifyTableDescriptor(tableDescriptor, paimonDescriptor); + } + + @Test + void testGetTableWithAllTypes() { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("log_c1", DataTypes.BOOLEAN()) + .column("log_c2", DataTypes.TINYINT()) + .column("log_c3", DataTypes.SMALLINT()) + .column("log_c4", DataTypes.INT()) + .column("log_c5", DataTypes.BIGINT()) + .column("log_c6", DataTypes.FLOAT()) + .column("log_c7", DataTypes.DOUBLE()) + .column("log_c8", DataTypes.DECIMAL(10, 2)) + .column("log_c9", DataTypes.CHAR(10)) + .column("log_c10", DataTypes.STRING()) + .column("log_c11", DataTypes.BYTES()) + .column("log_c12", DataTypes.BINARY(5)) + .column("log_c13", DataTypes.DATE()) + .column("log_c14", DataTypes.TIME()) + .column("log_c15", DataTypes.TIMESTAMP()) + .column("log_c16", DataTypes.TIMESTAMP_LTZ()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TablePath tablePath = TablePath.of("fluss", "all_types_table"); + flussPaimonCatalog.createTable(tablePath, tableDescriptor); + + TableDescriptor paimonDescriptor = flussPaimonCatalog.getTable(tablePath); + + verifyTableDescriptor(tableDescriptor, paimonDescriptor); + } + @Test void testAlterTableConfigs() throws Exception { String database = "test_alter_table_configs_db"; @@ -95,4 +159,36 @@ private void createTable(String database, String tableName) { flussPaimonCatalog.createTable(tablePath, td); } + + private void verifyTableDescriptor( + TableDescriptor tableDescriptor, TableDescriptor paimonDescriptor) { + assertThat(tableDescriptor.getSchema()).isEqualTo(paimonDescriptor.getSchema()); + assertThat(tableDescriptor.getBucketKeys()).isEqualTo(paimonDescriptor.getBucketKeys()); + assertThat(tableDescriptor.getPartitionKeys()) + .isEqualTo(paimonDescriptor.getPartitionKeys()); + assertThat(tableDescriptor.getTableDistribution()) + .isEqualTo(paimonDescriptor.getTableDistribution()); + assertThat(tableDescriptor.getProperties()).isEqualTo(paimonDescriptor.getProperties()); + + Map customProperties = new HashMap<>(tableDescriptor.getCustomProperties()); + tableDescriptor + .getProperties() + .forEach( + (k, v) -> { + if (v.equals(customProperties.get(k))) { + customProperties.remove(k); + } + }); + Map paimonCustomProperties = + new HashMap<>(paimonDescriptor.getCustomProperties()); + paimonDescriptor + .getProperties() + .forEach( + (k, v) -> { + if (v.equals(paimonCustomProperties.get(k))) { + paimonCustomProperties.remove(k); + } + }); + assertThat(customProperties).isEqualTo(paimonCustomProperties); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 3a3b42cff4..a3b13aa654 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -28,9 +28,7 @@ import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidDatabaseException; import org.apache.fluss.exception.InvalidTableException; -import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.SecurityDisabledException; -import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotPartitionedException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.DataLakeFormat; @@ -135,7 +133,6 @@ import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec; -import static org.apache.fluss.utils.Preconditions.checkNotNull; /** An RPC Gateway service for coordinator server. */ public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway { @@ -279,22 +276,13 @@ public CompletableFuture createTable(CreateTableRequest req tableAssignment = generateAssignment(bucketCount, replicaFactor, servers); } - // TODO: should tolerate if the lake exist but matches our schema. This ensures eventually - // consistent by idempotently creating the table multiple times. See #846 // before create table in fluss, we may create in lake if (isDataLakeEnabled(tableDescriptor)) { - try { - checkNotNull(lakeCatalogContainer.getLakeCatalog()) - .createTable(tablePath, tableDescriptor); - } catch (TableAlreadyExistException e) { - throw new LakeTableAlreadyExistException( - String.format( - "The table %s already exists in %s catalog, please " - + "first drop the table in %s catalog or use a new table name.", - tablePath, - lakeCatalogContainer.getDataLakeFormat(), - lakeCatalogContainer.getDataLakeFormat())); - } + metadataManager.createLakeTable( + tablePath, + tableDescriptor, + lakeCatalogContainer.getLakeCatalog(), + lakeCatalogContainer.getDataLakeFormat()); } // then create table; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 5b3b85f62f..726d34e438 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -25,7 +25,6 @@ import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidPartitionException; -import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.SchemaNotExistException; @@ -70,6 +69,8 @@ import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor; +import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptorCompatible; +import static org.apache.fluss.utils.Preconditions.checkNotNull; /** A manager for metadata. */ public class MetadataManager { @@ -311,6 +312,21 @@ public long createTable( "Fail to create table " + tablePath); } + public void createLakeTable( + TablePath tablePath, + TableDescriptor tableToCreate, + @Nullable LakeCatalog lakeCatalog, + @Nullable DataLakeFormat dataLakeFormat) { + try { + checkNotNull(lakeCatalog).createTable(tablePath, tableToCreate); + } catch (TableAlreadyExistException e) { + TableDescriptor lakeTableDescriptor = checkNotNull(lakeCatalog).getTable(tablePath); + + validateTableDescriptorCompatible( + tablePath, lakeCatalog, dataLakeFormat, tableToCreate, lakeTableDescriptor); + } + } + public void alterTableProperties( TablePath tablePath, List tableChanges, @@ -400,21 +416,8 @@ private void preAlterTableProperties( boolean isLakeTableNewlyCreated = false; // to enable lake table if (!isDataLakeEnabled(tableDescriptor)) { - // before create table in fluss, we may create in lake - try { - lakeCatalog.createTable(tablePath, newDescriptor); - // no need to alter lake table if it is newly created - isLakeTableNewlyCreated = true; - } catch (TableAlreadyExistException e) { - // TODO: should tolerate if the lake exist but matches our schema. This ensures - // eventually consistent by idempotently creating the table multiple times. See - // #846 - throw new LakeTableAlreadyExistException( - String.format( - "The table %s already exists in %s catalog, please " - + "first drop the table in %s catalog or use a new table name.", - tablePath, dataLakeFormat, dataLakeFormat)); - } + // before alter table in fluss, we may create in lake + createLakeTable(tablePath, newDescriptor, lakeCatalog, dataLakeFormat); } // only need to alter lake table if it is not newly created diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 76e8adc823..ee6c749976 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -25,12 +25,16 @@ import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.TooManyBucketsException; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.types.RowType; @@ -40,8 +44,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -130,6 +136,142 @@ public static void validateAlterTableProperties( } } + public static void validateTableDescriptorCompatible( + TablePath tablePath, + LakeCatalog lakeCatalog, + DataLakeFormat dataLakeFormat, + TableDescriptor newTableDescriptor, + TableDescriptor lakeTableDescriptor) { + // check distribution + if (!lakeTableDescriptor.getTableDistribution().isPresent()) { + // if the TableDistribution is not set, it indicates a log table without bucket keys + if (!newTableDescriptor.getTableDistribution().isPresent() + || !newTableDescriptor.getTableDistribution().get().getBucketKeys().isEmpty()) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, but the bucket keys doesn't match, " + + "new table bucket keys %s, exist lake table doesn't have bucket keys. " + + "Please first drop the table in %s catalog or use a new table name.", + tablePath, + dataLakeFormat, + newTableDescriptor.getTableDistribution().get().getBucketKeys(), + dataLakeFormat)); + } + } else { + if (!lakeTableDescriptor + .getTableDistribution() + .equals(newTableDescriptor.getTableDistribution())) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, but the distribution doesn't match, " + + "new table distribution %s, exist lake table distribution %s. " + + "Please first drop the table in %s catalog or use a new table name.", + tablePath, + dataLakeFormat, + newTableDescriptor.getTableDistribution().get(), + lakeTableDescriptor.getTableDistribution().get(), + dataLakeFormat)); + } + } + + // check schema + if (!newTableDescriptor.getSchema().equals(lakeTableDescriptor.getSchema())) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, but the schema doesn't match, " + + "new table schema %s, exist lake table schema %s. " + + "Please first drop the table in %s catalog or use a new table name.", + tablePath, + dataLakeFormat, + newTableDescriptor.getSchema(), + lakeTableDescriptor.getSchema(), + dataLakeFormat)); + } + + // check partition keys + if (!newTableDescriptor.getPartitionKeys().equals(lakeTableDescriptor.getPartitionKeys())) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, but the partition keys doesn't match, " + + "new table partition keys %s, exist lake table partition keys %s. " + + "Please first drop the table in %s catalog or use a new table name.", + tablePath, + dataLakeFormat, + newTableDescriptor.getPartitionKeys(), + lakeTableDescriptor.getPartitionKeys(), + dataLakeFormat)); + } + + // check properties + if (!newTableDescriptor.getProperties().equals(lakeTableDescriptor.getProperties())) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, but the properties doesn't match, " + + "new table properties %s, exist lake table properties %s. " + + "Please first drop the table in %s catalog or use a new table name.", + tablePath, + dataLakeFormat, + newTableDescriptor.getProperties(), + lakeTableDescriptor.getProperties(), + dataLakeFormat)); + } + + // check custom properties + Map newCustomProperties = + new HashMap<>(newTableDescriptor.getCustomProperties()); + Map newLakeProperties = new HashMap<>(); + newTableDescriptor + .getProperties() + .forEach( + (k, v) -> { + if (v.equals(newCustomProperties.get(k))) { + newCustomProperties.remove(k); + } + }); + newCustomProperties.forEach( + (k, v) -> { + if (k.startsWith(dataLakeFormat.toString() + ".")) { + newLakeProperties.put(k, v); + } + }); + newLakeProperties.forEach((k, v) -> newCustomProperties.remove(k)); + + Map lakeCustomProperties = + new HashMap<>(lakeTableDescriptor.getCustomProperties()); + Map lakeProperties = new HashMap<>(); + lakeTableDescriptor + .getProperties() + .forEach( + (k, v) -> { + if (v.equals(lakeCustomProperties.get(k))) { + lakeCustomProperties.remove(k); + } + }); + lakeCustomProperties.forEach( + (k, v) -> { + if (k.startsWith(dataLakeFormat.toString() + ".")) { + lakeProperties.put(k, v); + } + }); + lakeProperties.forEach((k, v) -> lakeCustomProperties.remove(k)); + + // delegate lake table properties compatibility check to lake catalog + lakeCatalog.validateTablePropertyCompatibility(lakeProperties, newLakeProperties); + + if (!newCustomProperties.equals(lakeCustomProperties)) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, but the custom properties doesn't match, " + + "new table custom properties %s, exist lake table custom properties %s. " + + "Please first drop the table in %s catalog or use a new table name.", + tablePath, + dataLakeFormat, + newCustomProperties, + lakeCustomProperties, + dataLakeFormat)); + } + } + private static void checkSystemColumns(RowType schema) { List fieldNames = schema.getFieldNames(); List unsupportedColumns = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index 8040bbd935..adf33a8253 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -19,7 +19,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.LakeTableAlreadyExistException; +import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; @@ -35,6 +35,7 @@ import java.util.Map; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -94,7 +95,7 @@ void testCreateAndGetTable() throws Exception { adminGateway .createTable(newCreateTableRequest(lakeTablePath, lakeTableDescriptor, false)) .get(); - // create again, should throw TableAlreadyExistException thrown by lake + // create again, should throw TableAlreadyExistException thrown by fluss assertThatThrownBy( () -> adminGateway @@ -103,9 +104,19 @@ void testCreateAndGetTable() throws Exception { lakeTablePath, lakeTableDescriptor, false)) .get()) .cause() - .isInstanceOf(LakeTableAlreadyExistException.class) - .hasMessage( - "The table %s already exists in paimon catalog, please first drop the table in paimon catalog or use a new table name.", - lakeTablePath); + .isInstanceOf(TableAlreadyExistException.class) + .hasMessage("Table %s already exists.", lakeTablePath); + // drop table in fluss, lake table should still exist + adminGateway + .dropTable( + newDropTableRequest( + lakeTablePath.getDatabaseName(), + lakeTablePath.getTableName(), + false)) + .get(); + // create again, should be ok even with lake table exist + adminGateway + .createTable(newCreateTableRequest(lakeTablePath, lakeTableDescriptor, false)) + .get(); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index deb2cddb0e..745b5e626d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -88,6 +88,7 @@ public void alterTable(TablePath tablePath, List tableChanges) // do nothing } + @Override public TableDescriptor getTable(TablePath tablePath) { return tableByPath.get(tablePath); } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index ee38b0c54b..d61463d51d 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -385,6 +385,7 @@ org.apache.fluss.lake.batch.ArrowRecordBatch org.apache.fluss.lake.committer.CommittedLakeSnapshot org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType + org.apache.fluss.lake.paimon.PaimonDataTypeToFlussDataType org.apache.fluss.lake.lance.*