diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index f3cff7016001..a34b0da8d709 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -165,9 +165,10 @@ This section introduce all available spark procedures about paimon. trigger_tag_automatic_creation Trigger the tag automatic creation. Arguments: -
  • table: the target table identifier. Cannot be empty.
  • +
  • table: the target table identifier. Cannot be empty.
  • + set `spark.paimon.tag.automatic-creation-without-delay`=true; -- enable automatic creation without delay

    CALL sys.trigger_tag_automatic_creation(table => 'default.T') diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 5d95f775aa86..1824e81d7492 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1194,6 +1194,12 @@ Boolean Whether to automatically complete missing tags. + +
    tag.automatic-creation-without-delay
    + false + Boolean + Whether to ignore delay when creating tag automatically. +
    tag.automatic-creation
    none diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 00118784af0c..581b662aa490 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1531,6 +1531,12 @@ public InlineElement getDescription() { .withDescription( "Whether to create tag automatically. And how to generate tags."); + public static final ConfigOption TAG_AUTOMATIC_CREATION_WITHOUT_DELAY = + key("tag.automatic-creation-without-delay") + .booleanType() + .defaultValue(false) + .withDescription("Whether to ignore delay when creating tag automatically"); + public static final ConfigOption TAG_CREATE_SUCCESS_FILE = key("tag.create-success-file") .booleanType() @@ -2845,6 +2851,10 @@ public TagCreationMode tagCreationMode() { return options.get(TAG_AUTOMATIC_CREATION); } + public Boolean tagAutoCreateWithoutDelay() { + return options.get(TAG_AUTOMATIC_CREATION_WITHOUT_DELAY); + } + public TagCreationPeriod tagCreationPeriod() { return options.get(TAG_CREATION_PERIOD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 08dfe2537869..658dd6448f1b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -70,6 +70,7 @@ private TagAutoCreation( TagDeletion tagDeletion, TagTimeExtractor timeExtractor, TagPeriodHandler periodHandler, + Boolean withoutDelay, Duration delay, @Nullable Integer numRetainedMax, @Nullable Duration defaultTimeRetained, @@ -81,7 +82,7 @@ private TagAutoCreation( this.tagDeletion = tagDeletion; this.timeExtractor = timeExtractor; this.periodHandler = periodHandler; - this.delay = delay; + this.delay = withoutDelay ? Duration.ZERO : delay; this.numRetainedMax = numRetainedMax; this.defaultTimeRetained = defaultTimeRetained; this.callbacks = callbacks; @@ -225,6 +226,7 @@ public static TagAutoCreation create( tagDeletion, extractor, TagPeriodHandler.create(options), + options.tagAutoCreateWithoutDelay(), options.tagCreationDelay(), options.tagNumRetainedMax(), options.tagDefaultTimeRetained(), diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java index f7ca06f20401..befb334108b9 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java @@ -28,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.util.Collections; + import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to trigger the tag automatic creation for a table. */ @@ -63,7 +65,11 @@ public InternalRow[] call(InternalRow args) { tableIdent, table -> { try { - ((FileStoreTable) table).newTagAutoManager().run(); + FileStoreTable fsTable = (FileStoreTable) table; + // Force a empty commit to make sure a snapshot exists + fsTable.newBatchWriteBuilder().newCommit().commit(Collections.emptyList()); + + fsTable.newTagAutoManager().run(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala index 422cf230601a..8f22ba5c0958 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala @@ -26,33 +26,59 @@ import org.assertj.core.api.Assertions.assertThat class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase { test("Paimon procedure: trigger tag automatic creation test") { - spark.sql("""CREATE TABLE T (id INT, name STRING) + spark.sql("""CREATE TABLE T_FORCE_AUTO_TAG (id INT, name STRING) |USING PAIMON |TBLPROPERTIES ( |'primary-key'='id' |)""".stripMargin) - spark.sql("insert into T values(1, 'a')") + spark.sql("insert into T_FORCE_AUTO_TAG values(1, 'a')") - val table = loadTable("T") + val table = loadTable("T_FORCE_AUTO_TAG") assertResult(1)(table.snapshotManager().snapshotCount()) - assertResult(0)(spark.sql("show tags T").count()) + assertResult(0)(spark.sql("show tags T_FORCE_AUTO_TAG").count()) - spark.sql("""alter table T set tblproperties( + spark.sql("""alter table T_FORCE_AUTO_TAG set tblproperties( |'tag.automatic-creation'='process-time', |'tag.creation-period'='daily', |'tag.creation-delay'='10 m', |'tag.num-retained-max'='90' |)""".stripMargin) - spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T')") - assertResult(1)(spark.sql("show tags T").count()) + spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T_FORCE_AUTO_TAG')") + assertResult(1)(spark.sql("show tags T_FORCE_AUTO_TAG").count()) assertResult( spark .sql("select date_format(date_sub(current_date(), 1), 'yyyy-MM-dd')") .head() - .getString(0))(loadTable("T").tagManager().tagObjects().get(0).getRight) + .getString(0))(loadTable("T_FORCE_AUTO_TAG").tagManager().tagObjects().get(0).getRight) + } + + test("Paimon procedure: trigger tag automatic creation without snapshot test") { + spark.sql("""CREATE TABLE T_FORCE_AUTO_TAG_NS (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ( + |'primary-key'='id', + |'snapshot.ignore-empty-commit'='false', + |'tag.automatic-creation'='process-time', + |'tag.creation-period'='daily', + |'tag.creation-delay'='10 m', + |'tag.num-retained-max'='90' + |)""".stripMargin) + + val table = loadTable("T_FORCE_AUTO_TAG_NS") + assertResult(0)(table.snapshotManager().snapshotCount()) + assertResult(0)(spark.sql("show tags T_FORCE_AUTO_TAG_NS").count()) + + spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T_FORCE_AUTO_TAG_NS')") + assertResult(1)(table.snapshotManager().snapshotCount()) + assertResult(1)(spark.sql("show tags T_FORCE_AUTO_TAG_NS").count()) + assertResult( + spark + .sql("select date_format(date_sub(current_date(), 1), 'yyyy-MM-dd')") + .head() + .getString(0))(loadTable("T_FORCE_AUTO_TAG_NS").tagManager().tagObjects().get(0).getRight) } }