Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support read inCommitTimestamp in Delta Lake history table #25056

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -753,11 +753,11 @@ SELECT * FROM "test_table$history"
```

```text
version | timestamp | user_id | user_name | operation | operation_parameters | cluster_id | read_version | isolation_level | is_blind_append | operation_metrics
---------+---------------------------------------+---------+-----------+--------------+---------------------------------------+---------------------------------+--------------+-------------------+-----------------+-------------------
2 | 2023-01-19 07:40:54.684 Europe/Vienna | trino | trino | WRITE | {queryId=20230119_064054_00008_4vq5t} | trino-406-trino-coordinator | 2 | WriteSerializable | true | {}
1 | 2023-01-19 07:40:41.373 Europe/Vienna | trino | trino | ADD COLUMNS | {queryId=20230119_064041_00007_4vq5t} | trino-406-trino-coordinator | 0 | WriteSerializable | true | {}
0 | 2023-01-19 07:40:10.497 Europe/Vienna | trino | trino | CREATE TABLE | {queryId=20230119_064010_00005_4vq5t} | trino-406-trino-coordinator | 0 | WriteSerializable | true | {}
version | timestamp | user_id | user_name | operation | operation_parameters | cluster_id | read_version | isolation_level | is_blind_append | operation_metrics
---------+-----------------------------+---------+-----------+--------------+---------------------------------------+---------------------------------+--------------+-------------------+-----------------+-------------------
2 | 2023-01-19 06:40:54.684 UTC | trino | trino | WRITE | {queryId=20230119_064054_00008_4vq5t} | trino-406-trino-coordinator | 2 | WriteSerializable | true | {}
1 | 2023-01-19 06:40:41.373 UTC | trino | trino | ADD COLUMNS | {queryId=20230119_064041_00007_4vq5t} | trino-406-trino-coordinator | 0 | WriteSerializable | true | {}
0 | 2023-01-19 06:40:10.497 UTC | trino | trino | CREATE TABLE | {queryId=20230119_064010_00005_4vq5t} | trino-406-trino-coordinator | 0 | WriteSerializable | true | {}
```

The output of the query has the following history columns:
Expand All @@ -775,6 +775,10 @@ The output of the query has the following history columns:
* - `timestamp`
- `TIMESTAMP(3) WITH TIME ZONE`
- The time when the table version became active
For tables with in-Commit timestamps enabled, this field returns value of
[in_commit_timestamp](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps),
Otherwise returns value of `timestamp` field that in the
[commitInfo](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#commit-provenance-information)
* - `user_id`
- `VARCHAR`
- The identifier for the user which performed the operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;

import java.util.List;
Expand All @@ -36,6 +35,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand Down Expand Up @@ -86,13 +86,15 @@ protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesB
commitInfoEntries = commitStream.collect(toImmutableList());
}

TimeZoneKey timeZoneKey = session.getTimeZoneKey();

commitInfoEntries.forEach(commitInfoEntry -> {
pagesBuilder.beginRow();

pagesBuilder.appendBigint(commitInfoEntry.version());
pagesBuilder.appendTimestampTzMillis(commitInfoEntry.timestamp(), timeZoneKey);
commitInfoEntry.inCommitTimestamp().ifPresentOrElse(
// use `inCommitTimestamp` if table In-Commit timestamps enabled, otherwise read the `timestamp` field
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#recommendations-for-readers-of-tables-with-in-commit-timestamps
inCommitTimestamp -> pagesBuilder.appendTimestampTzMillis(inCommitTimestamp, UTC_KEY),
() -> pagesBuilder.appendTimestampTzMillis(commitInfoEntry.timestamp(), UTC_KEY));
write(commitInfoEntry.userId(), pagesBuilder);
write(commitInfoEntry.userName(), pagesBuilder);
write(commitInfoEntry.operation(), pagesBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3063,6 +3063,7 @@ private CommitInfoEntry getCommitInfoEntry(
{
return new CommitInfoEntry(
commitVersion,
OptionalLong.empty(), // TODO: support write inCommitTimestamp
createdTime,
session.getUser(),
session.getUser(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
Expand All @@ -26,6 +27,7 @@

public record CommitInfoEntry(
long version,
OptionalLong inCommitTimestamp,
long timestamp,
String userId,
String userName,
Expand All @@ -48,13 +50,14 @@ public record CommitInfoEntry(

public CommitInfoEntry withVersion(long version)
{
return new CommitInfoEntry(version, timestamp, userId, userName, operation, operationParameters, job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend, operationMetrics);
return new CommitInfoEntry(version, inCommitTimestamp, timestamp, userId, userName, operation, operationParameters, job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend, operationMetrics);
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ SIZE_OF_LONG
+ sizeOf(inCommitTimestamp)
+ SIZE_OF_LONG
+ estimatedSizeOf(userId)
+ estimatedSizeOf(userName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ private DeltaLakeTransactionLogEntry buildCommitInfoEntry(ConnectorSession sessi

CommitInfoEntry result = new CommitInfoEntry(
commitInfo.getLong("version"),
OptionalLong.empty(),
Copy link
Member

@ebyhr ebyhr Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe code readers can't understand why this logic returns OptionalLong.empty(). I would recommend supporting the field in this PR.

Copy link
Contributor Author

@chenjian2664 chenjian2664 Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know to write the commitInfo in checkpoint file?
Current the whole method buildCommitInfoEntry is not covered by test or I am missing somewhere?
I see blow logic always is hit in the buildCommitInfoEntry in current tests.

        if (block.isNull(pagePosition)) {
            return null;
        }

commitInfo.getLong("timestamp"),
commitInfo.getString("userId"),
commitInfo.getString("userName"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public class TestDeltaLakeBasic
new ResourceTable("variant_types", "databricks153/variant_types"),
new ResourceTable("type_widening", "databricks153/type_widening"),
new ResourceTable("type_widening_partition", "databricks153/type_widening_partition"),
new ResourceTable("type_widening_nested", "databricks153/type_widening_nested"));
new ResourceTable("type_widening_nested", "databricks153/type_widening_nested"),
new ResourceTable("in_commit_timestamp_history_read", "deltalake/in_commit_timestamp_history_read"));

// The col-{uuid} pattern for delta.columnMapping.physicalName
private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
Expand Down Expand Up @@ -2355,6 +2356,25 @@ public void testUnsupportedWriterVersion()
"Cannot execute vacuum procedure with 8 writer version");
}

/**
* @see deltalake.in_commit_timestamp_history_read
*/
@Test
public void testReadInCommitTimestampInHistoryTable()
throws Exception
{
String tableName = "in_commit_timestamp_history_read_" + randomNameSuffix();
Path tableLocation = catalogDir.resolve(tableName);
copyDirectoryContents(new File(Resources.getResource("deltalake/in_commit_timestamp_history_read").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1), (5, 5)");

// The first two versions commitInfo doesn't contain `inCommitTimestamp`, the value is read from `timestamp` in commitInfo
// The last two versions commitInfo contain `inCommitTimestamp`, the value is read from it.
assertQuery("SELECT date_diff('millisecond', TIMESTAMP '1970-01-01 00:00:00 UTC', timestamp) FROM \"%s$history\"".formatted(tableName), "VALUES 1739859668531L, 1739859684775L, 1739859743394L, 1739859755480L");
}

private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Data generated using OSS Delta Lake 3.3.0:

```sql
CREATE TABLE test_in_commit_history_read
(id int, v int)
USING DELTA;

INSERT INTO test_in_commit_history_read VALUES (1, 1);

ALTER TABLE test_in_commit_history_read
SET TBLPROPERTIES ('delta.enableInCommitTimestamps' = 'true');

INSERT INTO test_in_commit_history_read VALUES (5, 5);
```

Manually updated `[00000000000000000003.json](_delta_log/00000000000000000003.json)` `inCommitTimestamp`
from `1739859755473` to `1739859755480` for distinguish it from `timestamp` field.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1739859668531,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"true","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"0fb15440-1643-44b4-9275-c4339a9e3c95"}}
{"metaData":{"id":"c2a035fa-131e-4ff2-ba8b-3b5a5f30f7af","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1739859668491}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1739859684775,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"669"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"dcf9fb6e-639a-4f4d-9e8c-e74d82043666"}}
{"add":{"path":"part-00000-99335e76-9f86-4491-ad2c-4acedfa2fa31-c000.snappy.parquet","partitionValues":{},"size":669,"modificationTime":1739859684000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"v\":1},\"maxValues\":{\"id\":1,\"v\":1},\"nullCount\":{\"id\":0,\"v\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"inCommitTimestamp":1739859743394,"timestamp":1739859743394,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.enableInCommitTimestamps\":\"true\"}"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"92e6f49a-c346-474e-8792-02fe21132aa1"}}
{"metaData":{"id":"c2a035fa-131e-4ff2-ba8b-3b5a5f30f7af","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableInCommitTimestamps":"true","delta.inCommitTimestampEnablementVersion":"2","delta.inCommitTimestampEnablementTimestamp":"1739859743394"},"createdTime":1739859668491}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["inCommitTimestamp","appendOnly","invariants"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"inCommitTimestamp":1739859755480,"timestamp":1739859755473,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"669"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"d8e03aa8-13c2-4584-a86e-706654be5b57"}}
{"add":{"path":"part-00000-3c9ea642-138f-4ba0-9350-85c9a4f61fda-c000.snappy.parquet","partitionValues":{},"size":669,"modificationTime":1739859755000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5,\"v\":5},\"maxValues\":{\"id\":5,\"v\":5},\"nullCount\":{\"id\":0,\"v\":0}}"}}
Binary file not shown.
Binary file not shown.