Skip to content
3 changes: 2 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ This section introduce all available spark procedures about paimon.
<td>trigger_tag_automatic_creation</td>
<td>
Trigger the tag automatic creation. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>table: the target table identifier. Cannot be empty.</li>
</td>
<td>
set `spark.paimon.tag.automatic-creation-without-delay`=true; -- enable automatic creation without delay<br/><br/>
CALL sys.trigger_tag_automatic_creation(table => 'default.T')
</td>
</tr>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,12 @@
<td>Boolean</td>
<td>Whether to automatically complete missing tags.</td>
</tr>
<tr>
<td><h5>tag.automatic-creation-without-delay</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore delay when creating tag automatically.</td>
</tr>
<tr>
<td><h5>tag.automatic-creation</h5></td>
<td style="word-wrap: break-word;">none</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,12 @@ public InlineElement getDescription() {
.withDescription(
"Whether to create tag automatically. And how to generate tags.");

public static final ConfigOption<Boolean> TAG_AUTOMATIC_CREATION_WITHOUT_DELAY =
key("tag.automatic-creation-without-delay")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore delay when creating tag automatically");

public static final ConfigOption<Boolean> TAG_CREATE_SUCCESS_FILE =
key("tag.create-success-file")
.booleanType()
Expand Down Expand Up @@ -2845,6 +2851,10 @@ public TagCreationMode tagCreationMode() {
return options.get(TAG_AUTOMATIC_CREATION);
}

public Boolean tagAutoCreateWithoutDelay() {
return options.get(TAG_AUTOMATIC_CREATION_WITHOUT_DELAY);
}

public TagCreationPeriod tagCreationPeriod() {
return options.get(TAG_CREATION_PERIOD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private TagAutoCreation(
TagDeletion tagDeletion,
TagTimeExtractor timeExtractor,
TagPeriodHandler periodHandler,
Boolean withoutDelay,
Duration delay,
@Nullable Integer numRetainedMax,
@Nullable Duration defaultTimeRetained,
Expand All @@ -81,7 +82,7 @@ private TagAutoCreation(
this.tagDeletion = tagDeletion;
this.timeExtractor = timeExtractor;
this.periodHandler = periodHandler;
this.delay = delay;
this.delay = withoutDelay ? Duration.ZERO : delay;
this.numRetainedMax = numRetainedMax;
this.defaultTimeRetained = defaultTimeRetained;
this.callbacks = callbacks;
Expand Down Expand Up @@ -225,6 +226,7 @@ public static TagAutoCreation create(
tagDeletion,
extractor,
TagPeriodHandler.create(options),
options.tagAutoCreateWithoutDelay(),
options.tagCreationDelay(),
options.tagNumRetainedMax(),
options.tagDefaultTimeRetained(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Collections;

import static org.apache.spark.sql.types.DataTypes.StringType;

/** A procedure to trigger the tag automatic creation for a table. */
Expand Down Expand Up @@ -63,7 +65,11 @@ public InternalRow[] call(InternalRow args) {
tableIdent,
table -> {
try {
((FileStoreTable) table).newTagAutoManager().run();
FileStoreTable fsTable = (FileStoreTable) table;
// Force a empty commit to make sure a snapshot exists
fsTable.newBatchWriteBuilder().newCommit().commit(Collections.emptyList());

fsTable.newTagAutoManager().run();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,59 @@ import org.assertj.core.api.Assertions.assertThat
class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase {

test("Paimon procedure: trigger tag automatic creation test") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
spark.sql("""CREATE TABLE T_FORCE_AUTO_TAG (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES (
|'primary-key'='id'
|)""".stripMargin)

spark.sql("insert into T values(1, 'a')")
spark.sql("insert into T_FORCE_AUTO_TAG values(1, 'a')")

val table = loadTable("T")
val table = loadTable("T_FORCE_AUTO_TAG")
assertResult(1)(table.snapshotManager().snapshotCount())

assertResult(0)(spark.sql("show tags T").count())
assertResult(0)(spark.sql("show tags T_FORCE_AUTO_TAG").count())

spark.sql("""alter table T set tblproperties(
spark.sql("""alter table T_FORCE_AUTO_TAG set tblproperties(
|'tag.automatic-creation'='process-time',
|'tag.creation-period'='daily',
|'tag.creation-delay'='10 m',
|'tag.num-retained-max'='90'
|)""".stripMargin)

spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T')")
assertResult(1)(spark.sql("show tags T").count())
spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T_FORCE_AUTO_TAG')")
assertResult(1)(spark.sql("show tags T_FORCE_AUTO_TAG").count())
assertResult(
spark
.sql("select date_format(date_sub(current_date(), 1), 'yyyy-MM-dd')")
.head()
.getString(0))(loadTable("T").tagManager().tagObjects().get(0).getRight)
.getString(0))(loadTable("T_FORCE_AUTO_TAG").tagManager().tagObjects().get(0).getRight)
}

test("Paimon procedure: trigger tag automatic creation without snapshot test") {
spark.sql("""CREATE TABLE T_FORCE_AUTO_TAG_NS (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES (
|'primary-key'='id',
|'snapshot.ignore-empty-commit'='false',
|'tag.automatic-creation'='process-time',
|'tag.creation-period'='daily',
|'tag.creation-delay'='10 m',
|'tag.num-retained-max'='90'
|)""".stripMargin)

val table = loadTable("T_FORCE_AUTO_TAG_NS")
assertResult(0)(table.snapshotManager().snapshotCount())
assertResult(0)(spark.sql("show tags T_FORCE_AUTO_TAG_NS").count())

spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 'test.T_FORCE_AUTO_TAG_NS')")
assertResult(1)(table.snapshotManager().snapshotCount())
assertResult(1)(spark.sql("show tags T_FORCE_AUTO_TAG_NS").count())
assertResult(
spark
.sql("select date_format(date_sub(current_date(), 1), 'yyyy-MM-dd')")
.head()
.getString(0))(loadTable("T_FORCE_AUTO_TAG_NS").tagManager().tagObjects().get(0).getRight)
}

}
Loading