Skip to content

Commit ef44562

Browse files
Support retrieving clustering information of Delta Lake tables.
1 parent 1e7927f commit ef44562

File tree

89 files changed

+1514
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+1514
-3
lines changed

docs/src/main/sphinx/connector/delta-lake.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ values. Typical usage does not require you to configure them.
207207
- Number of threads used for retrieving checkpoint files of each table. Currently, only
208208
retrievals of V2 Checkpoint's sidecar files are parallelized.
209209
- `4`
210+
* - `delta.enable-clustering-info`
211+
- Controls whether clustered column information is retrieved and
212+
included in the table properties for Delta Lake tables.
213+
The equivalent catalog session property is `enable_clustering_info`
214+
- `false`
210215
:::
211216

212217
### Catalog session properties

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public class DeltaLakeConfig
9898
private boolean deltaLogFileSystemCacheDisabled;
9999
private int metadataParallelism = 8;
100100
private int checkpointProcessingParallelism = 4;
101+
private boolean enableClusteringInfo;
101102

102103
public Duration getMetadataCacheTtl()
103104
{
@@ -392,6 +393,19 @@ public DeltaLakeConfig setCompressionCodec(HiveCompressionOption compressionCode
392393
return this;
393394
}
394395

396+
public boolean isEnableClusteringInfo()
397+
{
398+
return enableClusteringInfo;
399+
}
400+
401+
@Config("delta.enable-clustering-info")
402+
@ConfigDescription("If show clustered columns in table metadata")
403+
public DeltaLakeConfig setEnableClusteringInfo(boolean enableClusteringInfo)
404+
{
405+
this.enableClusteringInfo = enableClusteringInfo;
406+
return this;
407+
}
408+
395409
@Min(1)
396410
public long getPerTransactionMetastoreCacheMaximumSize()
397411
{

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@
242242
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_TABLE;
243243
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
244244
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite;
245+
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isEnableClusteringInfo;
245246
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
246247
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
247248
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
@@ -250,6 +251,7 @@
250251
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.buildSplitPath;
251252
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
252253
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
254+
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CLUSTER_BY_PROPERTY;
253255
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.COLUMN_MAPPING_MODE_PROPERTY;
254256
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.DELETION_VECTORS_ENABLED_PROPERTY;
255257
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY;
@@ -295,6 +297,7 @@
295297
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
296298
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType;
297299
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping;
300+
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.CLUSTERED_TABLES_FEATURE_NAME;
298301
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedReaderFeatures;
299302
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures;
300303
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
@@ -649,6 +652,11 @@ private static long getTargetVersion(TrinoFileSystem fileSystem, String tableLoc
649652
return snapshotId;
650653
}
651654

655+
public static int getTemporalTimeTravelLinearSearchMaxSize()
656+
{
657+
return TEMPORAL_TIME_TRAVEL_LINEAR_SEARCH_MAX_SIZE;
658+
}
659+
652660
@Override
653661
public List<String> listSchemaNames(ConnectorSession session)
654662
{
@@ -742,6 +750,12 @@ public LocatedTableHandle getTableHandle(
742750
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion());
743751
return null;
744752
}
753+
754+
Optional<List<String>> clusteredColumns = Optional.empty();
755+
if (isEnableClusteringInfo(session) && protocolEntry.writerFeaturesContains(CLUSTERED_TABLES_FEATURE_NAME)) {
756+
clusteredColumns = transactionLogAccess.getClusteredColumns(fileSystem, tableSnapshot);
757+
}
758+
745759
Set<String> unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.readerFeatures().orElse(ImmutableSet.of()));
746760
if (!unsupportedReaderFeatures.isEmpty()) {
747761
LOG.debug("Skip %s because the table contains unsupported reader features: %s", tableName, unsupportedReaderFeatures);
@@ -760,6 +774,7 @@ public LocatedTableHandle getTableHandle(
760774
tableLocation,
761775
metadataEntry,
762776
protocolEntry,
777+
clusteredColumns,
763778
TupleDomain.all(),
764779
TupleDomain.all(),
765780
Optional.empty(),
@@ -842,6 +857,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
842857
properties.put(PARTITIONED_BY_PROPERTY, partitionColumnNames);
843858
}
844859

860+
if (tableHandle.getClusteredColumns().isPresent() && !tableHandle.getClusteredColumns().get().isEmpty()) {
861+
properties.put(CLUSTER_BY_PROPERTY, tableHandle.getClusteredColumns().get());
862+
}
863+
845864
Optional<Long> checkpointInterval = metadataEntry.getCheckpointInterval();
846865
checkpointInterval.ifPresent(value -> properties.put(CHECKPOINT_INTERVAL_PROPERTY, value));
847866

@@ -3495,6 +3514,7 @@ else if (!partitionColumns.contains(column)) {
34953514
tableHandle.getLocation(),
34963515
tableHandle.getMetadataEntry(),
34973516
tableHandle.getProtocolEntry(),
3517+
tableHandle.getClusteredColumns(),
34983518
// Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is.
34993519
// The unenforced constraint will still be checked by the engine.
35003520
tableHandle.getEnforcedPartitionConstraint()
@@ -3795,6 +3815,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
37953815
handle.getLocation(),
37963816
metadata,
37973817
handle.getProtocolEntry(),
3818+
handle.getClusteredColumns(),
37983819
TupleDomain.all(),
37993820
TupleDomain.all(),
38003821
Optional.empty(),

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public final class DeltaLakeSessionProperties
6565
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
6666
private static final String IDLE_WRITER_MIN_FILE_SIZE = "idle_writer_min_file_size";
6767
private static final String COMPRESSION_CODEC = "compression_codec";
68+
private static final String ENABLE_CLUSTERING_INFO = "enable_clustering_info";
6869
// This property is not supported by Delta Lake and exists solely for technical reasons.
6970
@Deprecated
7071
private static final String TIMESTAMP_PRECISION = "timestamp_precision";
@@ -217,6 +218,11 @@ public DeltaLakeSessionProperties(
217218
}
218219
},
219220
false),
221+
booleanProperty(
222+
ENABLE_CLUSTERING_INFO,
223+
"If show clustered columns in table metadata",
224+
deltaLakeConfig.isEnableClusteringInfo(),
225+
false),
220226
booleanProperty(
221227
PROJECTION_PUSHDOWN_ENABLED,
222228
"Read only required fields from a row type",
@@ -340,6 +346,11 @@ public static HiveCompressionOption getCompressionCodec(ConnectorSession session
340346
return session.getProperty(COMPRESSION_CODEC, HiveCompressionOption.class);
341347
}
342348

349+
public static boolean isEnableClusteringInfo(ConnectorSession session)
350+
{
351+
return session.getProperty(ENABLE_CLUSTERING_INFO, Boolean.class);
352+
}
353+
343354
public static boolean isProjectionPushdownEnabled(ConnectorSession session)
344355
{
345356
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public enum WriteType
4949
private final String location;
5050
private final MetadataEntry metadataEntry;
5151
private final ProtocolEntry protocolEntry;
52+
private final Optional<List<String>> clusteredColumns;
5253
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
5354
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
5455
private final Optional<WriteType> writeType;
@@ -79,6 +80,7 @@ public DeltaLakeTableHandle(
7980
@JsonProperty("location") String location,
8081
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
8182
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
83+
@JsonProperty("clusteredColumns") Optional<List<String>> clusteredColumns,
8284
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
8385
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
8486
@JsonProperty("writeType") Optional<WriteType> writeType,
@@ -96,6 +98,7 @@ public DeltaLakeTableHandle(
9698
location,
9799
metadataEntry,
98100
protocolEntry,
101+
clusteredColumns,
99102
enforcedPartitionConstraint,
100103
nonPartitionConstraint,
101104
ImmutableSet.of(),
@@ -118,6 +121,7 @@ public DeltaLakeTableHandle(
118121
String location,
119122
MetadataEntry metadataEntry,
120123
ProtocolEntry protocolEntry,
124+
Optional<List<String>> clusteredColumns,
121125
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
122126
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
123127
Set<DeltaLakeColumnHandle> constraintColumns,
@@ -138,6 +142,7 @@ public DeltaLakeTableHandle(
138142
this.location = requireNonNull(location, "location is null");
139143
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
140144
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
145+
this.clusteredColumns = requireNonNull(clusteredColumns, "clusteredColumns is null");
141146
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
142147
this.nonPartitionConstraint = requireNonNull(nonPartitionConstraint, "nonPartitionConstraint is null");
143148
this.writeType = requireNonNull(writeType, "writeType is null");
@@ -164,6 +169,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
164169
location,
165170
metadataEntry,
166171
protocolEntry,
172+
clusteredColumns,
167173
enforcedPartitionConstraint,
168174
nonPartitionConstraint,
169175
constraintColumns,
@@ -188,6 +194,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
188194
location,
189195
metadataEntry,
190196
protocolEntry,
197+
clusteredColumns,
191198
enforcedPartitionConstraint,
192199
nonPartitionConstraint,
193200
constraintColumns,
@@ -268,6 +275,12 @@ public ProtocolEntry getProtocolEntry()
268275
return protocolEntry;
269276
}
270277

278+
@JsonProperty
279+
public Optional<List<String>> getClusteredColumns()
280+
{
281+
return clusteredColumns;
282+
}
283+
271284
@JsonProperty
272285
public TupleDomain<DeltaLakeColumnHandle> getEnforcedPartitionConstraint()
273286
{
@@ -371,6 +384,7 @@ public boolean equals(Object o)
371384
Objects.equals(location, that.location) &&
372385
Objects.equals(metadataEntry, that.metadataEntry) &&
373386
Objects.equals(protocolEntry, that.protocolEntry) &&
387+
Objects.equals(clusteredColumns, that.clusteredColumns) &&
374388
Objects.equals(enforcedPartitionConstraint, that.enforcedPartitionConstraint) &&
375389
Objects.equals(nonPartitionConstraint, that.nonPartitionConstraint) &&
376390
Objects.equals(writeType, that.writeType) &&
@@ -394,6 +408,7 @@ public int hashCode()
394408
location,
395409
metadataEntry,
396410
protocolEntry,
411+
clusteredColumns,
397412
enforcedPartitionConstraint,
398413
nonPartitionConstraint,
399414
writeType,

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class DeltaLakeTableProperties
3939
{
4040
public static final String LOCATION_PROPERTY = "location";
4141
public static final String PARTITIONED_BY_PROPERTY = "partitioned_by";
42+
public static final String CLUSTER_BY_PROPERTY = "clustered_by";
4243
public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval";
4344
public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled";
4445
public static final String COLUMN_MAPPING_MODE_PROPERTY = "column_mapping_mode";
@@ -94,6 +95,17 @@ public DeltaLakeTableProperties(DeltaLakeConfig config)
9495
"Enables deletion vectors",
9596
config.isDeletionVectorsEnabled(),
9697
false))
98+
.add(new PropertyMetadata<>(
99+
CLUSTER_BY_PROPERTY,
100+
"Liquid cluster columns",
101+
new ArrayType(VARCHAR),
102+
List.class,
103+
ImmutableList.of(),
104+
false,
105+
value -> ((Collection<String>) value).stream()
106+
.map(name -> name.toLowerCase(ENGLISH))
107+
.collect(toImmutableList()),
108+
value -> value))
97109
.build();
98110
}
99111

@@ -113,6 +125,12 @@ public static List<String> getPartitionedBy(Map<String, Object> tableProperties)
113125
return partitionedBy == null ? ImmutableList.of() : ImmutableList.copyOf(partitionedBy);
114126
}
115127

128+
public static List<String> getClusteredBy(Map<String, Object> tableProperties)
129+
{
130+
List<String> clusteredBy = (List<String>) tableProperties.get(CLUSTER_BY_PROPERTY);
131+
return clusteredBy == null ? ImmutableList.of() : ImmutableList.copyOf(clusteredBy);
132+
}
133+
116134
public static Optional<Long> getCheckpointInterval(Map<String, Object> tableProperties)
117135
{
118136
Optional<Long> checkpointInterval = Optional.ofNullable((Long) tableProperties.get(CHECKPOINT_INTERVAL_PROPERTY));

0 commit comments

Comments
 (0)