Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ values. Typical usage does not require you to configure them.
- Number of threads used for retrieving checkpoint files of each table. Currently, only
retrievals of V2 Checkpoint's sidecar files are parallelized.
- `4`
* - `delta.enable-clustering-info`
- Controls whether clustered column information is retrieved and
included in the table properties for Delta Lake tables.
The equivalent catalog session property is `enable_clustering_info`
- `false`
:::

### Catalog session properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class DeltaLakeConfig
private boolean deltaLogFileSystemCacheDisabled;
private int metadataParallelism = 8;
private int checkpointProcessingParallelism = 4;
private boolean enableClusteringInfo;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -392,6 +393,19 @@ public DeltaLakeConfig setCompressionCodec(HiveCompressionOption compressionCode
return this;
}

public boolean isEnableClusteringInfo()
{
return enableClusteringInfo;
}

@Config("delta.enable-clustering-info")
@ConfigDescription("If show clustered columns in table metadata")
public DeltaLakeConfig setEnableClusteringInfo(boolean enableClusteringInfo)
{
this.enableClusteringInfo = enableClusteringInfo;
return this;
}

@Min(1)
public long getPerTransactionMetastoreCacheMaximumSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_TABLE;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isEnableClusteringInfo;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
Expand All @@ -250,6 +251,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.buildSplitPath;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CLUSTER_BY_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.COLUMN_MAPPING_MODE_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.DELETION_VECTORS_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY;
Expand Down Expand Up @@ -295,6 +297,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.CLUSTERED_TABLES_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedReaderFeatures;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
Expand Down Expand Up @@ -649,6 +652,11 @@ private static long getTargetVersion(TrinoFileSystem fileSystem, String tableLoc
return snapshotId;
}

public static int getTemporalTimeTravelLinearSearchMaxSize()
{
return TEMPORAL_TIME_TRAVEL_LINEAR_SEARCH_MAX_SIZE;
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
Expand Down Expand Up @@ -742,6 +750,12 @@ public LocatedTableHandle getTableHandle(
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion());
return null;
}

Optional<List<String>> clusteredColumns = Optional.empty();
if (isEnableClusteringInfo(session) && protocolEntry.writerFeaturesContains(CLUSTERED_TABLES_FEATURE_NAME)) {
clusteredColumns = transactionLogAccess.getClusteredColumns(fileSystem, tableSnapshot);
}

Set<String> unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.readerFeatures().orElse(ImmutableSet.of()));
if (!unsupportedReaderFeatures.isEmpty()) {
LOG.debug("Skip %s because the table contains unsupported reader features: %s", tableName, unsupportedReaderFeatures);
Expand All @@ -760,6 +774,7 @@ public LocatedTableHandle getTableHandle(
tableLocation,
metadataEntry,
protocolEntry,
clusteredColumns,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down Expand Up @@ -842,6 +857,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
properties.put(PARTITIONED_BY_PROPERTY, partitionColumnNames);
}

if (tableHandle.getClusteredColumns().isPresent() && !tableHandle.getClusteredColumns().get().isEmpty()) {
properties.put(CLUSTER_BY_PROPERTY, tableHandle.getClusteredColumns().get());
}

Optional<Long> checkpointInterval = metadataEntry.getCheckpointInterval();
checkpointInterval.ifPresent(value -> properties.put(CHECKPOINT_INTERVAL_PROPERTY, value));

Expand Down Expand Up @@ -3495,6 +3514,7 @@ else if (!partitionColumns.contains(column)) {
tableHandle.getLocation(),
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getClusteredColumns(),
// Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is.
// The unenforced constraint will still be checked by the engine.
tableHandle.getEnforcedPartitionConstraint()
Expand Down Expand Up @@ -3795,6 +3815,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
handle.getLocation(),
metadata,
handle.getProtocolEntry(),
handle.getClusteredColumns(),
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class DeltaLakeSessionProperties
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String IDLE_WRITER_MIN_FILE_SIZE = "idle_writer_min_file_size";
private static final String COMPRESSION_CODEC = "compression_codec";
private static final String ENABLE_CLUSTERING_INFO = "enable_clustering_info";
// This property is not supported by Delta Lake and exists solely for technical reasons.
@Deprecated
private static final String TIMESTAMP_PRECISION = "timestamp_precision";
Expand Down Expand Up @@ -217,6 +218,11 @@ public DeltaLakeSessionProperties(
}
},
false),
booleanProperty(
ENABLE_CLUSTERING_INFO,
"If show clustered columns in table metadata",
deltaLakeConfig.isEnableClusteringInfo(),
false),
booleanProperty(
PROJECTION_PUSHDOWN_ENABLED,
"Read only required fields from a row type",
Expand Down Expand Up @@ -340,6 +346,11 @@ public static HiveCompressionOption getCompressionCodec(ConnectorSession session
return session.getProperty(COMPRESSION_CODEC, HiveCompressionOption.class);
}

public static boolean isEnableClusteringInfo(ConnectorSession session)
{
return session.getProperty(ENABLE_CLUSTERING_INFO, Boolean.class);
}

public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum WriteType
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final Optional<List<String>> clusteredColumns;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
Expand Down Expand Up @@ -79,6 +80,7 @@ public DeltaLakeTableHandle(
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("clusteredColumns") Optional<List<String>> clusteredColumns,
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
Expand All @@ -96,6 +98,7 @@ public DeltaLakeTableHandle(
location,
metadataEntry,
protocolEntry,
clusteredColumns,
enforcedPartitionConstraint,
nonPartitionConstraint,
ImmutableSet.of(),
Expand All @@ -118,6 +121,7 @@ public DeltaLakeTableHandle(
String location,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
Optional<List<String>> clusteredColumns,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> constraintColumns,
Expand All @@ -138,6 +142,7 @@ public DeltaLakeTableHandle(
this.location = requireNonNull(location, "location is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.clusteredColumns = requireNonNull(clusteredColumns, "clusteredColumns is null");
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
this.nonPartitionConstraint = requireNonNull(nonPartitionConstraint, "nonPartitionConstraint is null");
this.writeType = requireNonNull(writeType, "writeType is null");
Expand All @@ -164,6 +169,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
location,
metadataEntry,
protocolEntry,
clusteredColumns,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
Expand All @@ -188,6 +194,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
location,
metadataEntry,
protocolEntry,
clusteredColumns,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
Expand Down Expand Up @@ -268,6 +275,12 @@ public ProtocolEntry getProtocolEntry()
return protocolEntry;
}

@JsonProperty
public Optional<List<String>> getClusteredColumns()
{
return clusteredColumns;
}

@JsonProperty
public TupleDomain<DeltaLakeColumnHandle> getEnforcedPartitionConstraint()
{
Expand Down Expand Up @@ -371,6 +384,7 @@ public boolean equals(Object o)
Objects.equals(location, that.location) &&
Objects.equals(metadataEntry, that.metadataEntry) &&
Objects.equals(protocolEntry, that.protocolEntry) &&
Objects.equals(clusteredColumns, that.clusteredColumns) &&
Objects.equals(enforcedPartitionConstraint, that.enforcedPartitionConstraint) &&
Objects.equals(nonPartitionConstraint, that.nonPartitionConstraint) &&
Objects.equals(writeType, that.writeType) &&
Expand All @@ -394,6 +408,7 @@ public int hashCode()
location,
metadataEntry,
protocolEntry,
clusteredColumns,
enforcedPartitionConstraint,
nonPartitionConstraint,
writeType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DeltaLakeTableProperties
{
public static final String LOCATION_PROPERTY = "location";
public static final String PARTITIONED_BY_PROPERTY = "partitioned_by";
public static final String CLUSTER_BY_PROPERTY = "clustered_by";
public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval";
public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled";
public static final String COLUMN_MAPPING_MODE_PROPERTY = "column_mapping_mode";
Expand Down Expand Up @@ -94,6 +95,17 @@ public DeltaLakeTableProperties(DeltaLakeConfig config)
"Enables deletion vectors",
config.isDeletionVectorsEnabled(),
false))
.add(new PropertyMetadata<>(
CLUSTER_BY_PROPERTY,
"Liquid cluster columns",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> ((Collection<String>) value).stream()
.map(name -> name.toLowerCase(ENGLISH))
.collect(toImmutableList()),
value -> value))
.build();
}

Expand All @@ -113,6 +125,12 @@ public static List<String> getPartitionedBy(Map<String, Object> tableProperties)
return partitionedBy == null ? ImmutableList.of() : ImmutableList.copyOf(partitionedBy);
}

public static List<String> getClusteredBy(Map<String, Object> tableProperties)
{
List<String> clusteredBy = (List<String>) tableProperties.get(CLUSTER_BY_PROPERTY);
return clusteredBy == null ? ImmutableList.of() : ImmutableList.copyOf(clusteredBy);
}

public static Optional<Long> getCheckpointInterval(Map<String, Object> tableProperties)
{
Optional<Long> checkpointInterval = Optional.ofNullable((Long) tableProperties.get(CHECKPOINT_INTERVAL_PROPERTY));
Expand Down
Loading