-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Add dynamic schema support based on protobuf descriptor #27137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR implements dynamic schema support for SEQUENCEFILE_PROTOBUF Hive tables by wrapping the Hive metastore to load and cache protobuf descriptors at runtime, inferring table and partition columns from descriptor fields instead of relying on static metastore columns. It adds protobuf build and runtime dependencies, introduces a ProtobufDeserializerFactoryLoader for DI, modifies HiveMetadataFactory to use DynamicSchemaHiveMetastore, implements dynamic schema classes (wrapper, loader, cache), and includes tests verifying inferred schemas. Sequence diagram for dynamic schema column resolution on table readsequenceDiagram
participant Client
participant HiveMetadataFactory
participant DynamicSchemaHiveMetastore
participant ProtobufDeserializerFactory
participant DescriptorFile
Client->>HiveMetadataFactory: Request table metadata
HiveMetadataFactory->>DynamicSchemaHiveMetastore: getTable(databaseName, tableName)
alt Table is SEQUENCEFILE_PROTOBUF
DynamicSchemaHiveMetastore->>ProtobufDeserializerFactory: getDescriptor(serializationClass)
ProtobufDeserializerFactory->>DescriptorFile: Load protobuf descriptor
ProtobufDeserializerFactory-->>DynamicSchemaHiveMetastore: Descriptor
DynamicSchemaHiveMetastore->>DynamicSchemaLoader: fieldToColumn(descriptor.fields)
DynamicSchemaHiveMetastore-->>HiveMetadataFactory: Table with dynamic columns
else Table is not SEQUENCEFILE_PROTOBUF
DynamicSchemaHiveMetastore-->>HiveMetadataFactory: Table with static columns
end
HiveMetadataFactory-->>Client: Return table metadata
ER diagram for dynamic schema column inferenceerDiagram
TABLE {
string databaseName
string tableName
map storage
list columns
}
PROTOBUF_DESCRIPTOR {
string fullName
list fields
}
COLUMN {
string name
string type
string comment
}
TABLE ||--o{ COLUMN : has
PROTOBUF_DESCRIPTOR ||--o{ COLUMN : infers
Class diagram for dynamic schema metastore componentsclassDiagram
class HiveMetastore {
<<interface>>
}
class HiveMetastoreWrapper {
- delegate : HiveMetastore
+ HiveMetastoreWrapper(HiveMetastore)
...
}
class DynamicSchemaHiveMetastore {
- dynamicSchemaCache : LoadingCache<TableReference, List<Column>>
+ DynamicSchemaHiveMetastore(HiveMetastore, ProtobufDeserializerFactory, Duration)
+ getTable(databaseName, tableName)
+ getPartition(table, partitionValues)
+ getPartitionsByNames(table, partitionNames)
}
class TableReference {
- table : Table
+ getFullTableName()
+ getSerializationClass()
}
class DynamicSchemaLoader {
+ fieldToColumn(FieldDescriptor) : Column
}
class ProtobufDeserializerFactory {
+ getDescriptor(serializationClass) : Descriptor
}
class ProtobufDeserializerFactoryLoader {
- factory : ProtobufDeserializerFactory
+ get() : ProtobufDeserializerFactory
}
HiveMetastoreWrapper --|> HiveMetastore
DynamicSchemaHiveMetastore --|> HiveMetastoreWrapper
DynamicSchemaHiveMetastore --> TableReference
DynamicSchemaHiveMetastore --> DynamicSchemaLoader
DynamicSchemaHiveMetastore --> ProtobufDeserializerFactory
ProtobufDeserializerFactoryLoader --> ProtobufDeserializerFactory
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Avoid hardcoding the dynamic schema cache expiration in HiveMetadataFactory (currently
Duration.valueOf("1h"))—expose it via HiveConfig instead of leaving it as a TODO. - The new HiveMetastoreWrapper duplicates every HiveMetastore method and adds a lot of boilerplate; consider using a dynamic proxy or default interface methods to reduce manual delegation.
- The ProtobufDeserializerFactoryLoader is just a thin wrapper around ProtobufDeserializerFactory and HiveConfig—inject the factory directly to simplify bindings and remove the extra loader class.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Avoid hardcoding the dynamic schema cache expiration in HiveMetadataFactory (currently `Duration.valueOf("1h")`)—expose it via HiveConfig instead of leaving it as a TODO.
- The new HiveMetastoreWrapper duplicates every HiveMetastore method and adds a lot of boilerplate; consider using a dynamic proxy or default interface methods to reduce manual delegation.
- The ProtobufDeserializerFactoryLoader is just a thin wrapper around ProtobufDeserializerFactory and HiveConfig—inject the factory directly to simplify bindings and remove the extra loader class.
## Individual Comments
### Comment 1
<location> `plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java:241-244` </location>
<code_context>
public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit)
{
- HiveMetastore hiveMetastore = createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize);
+ HiveMetastore hiveMetastore = new DynamicSchemaHiveMetastore(
+ createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize),
+ protobufDeserializerFactory,
+ Duration.valueOf("1h")); // TODO make configurable
DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
</code_context>
<issue_to_address>
**suggestion:** Hardcoded cache expiration duration should be configurable.
Consider exposing the cache expiration setting through HiveConfig or a similar configuration option to support workload-specific tuning.
Suggested implementation:
```java
this.protobufDeserializerFactory = requireNonNull(protobufDeserializerFactory, "protobufDeserializerFactory is null");
this.dynamicSchemaCacheExpiration = requireNonNull(hiveConfig.getDynamicSchemaCacheExpiration(), "dynamicSchemaCacheExpiration is null");
```
```java
@Override
public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit)
{
HiveMetastore hiveMetastore = new DynamicSchemaHiveMetastore(
createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize),
protobufDeserializerFactory,
dynamicSchemaCacheExpiration);
DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
```
1. In `HiveConfig` (likely in `plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java`), add a property:
```java
private Duration dynamicSchemaCacheExpiration = Duration.valueOf("1h");
@Config("hive.dynamic-schema-cache-expiration")
public HiveConfig setDynamicSchemaCacheExpiration(Duration duration) {
this.dynamicSchemaCacheExpiration = duration;
return this;
}
public Duration getDynamicSchemaCacheExpiration() {
return dynamicSchemaCacheExpiration;
}
```
2. Pass `hiveConfig` to the `HiveMetadataFactory` constructor if not already done.
3. Add a field to `HiveMetadataFactory`:
```java
private final Duration dynamicSchemaCacheExpiration;
```
4. Update any relevant configuration documentation to describe the new property.
</issue_to_address>
### Comment 2
<location> `plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaHiveMetastore.java:111-120` </location>
<code_context>
+ private record TableReference(Table table)
</code_context>
<issue_to_address>
**issue (bug_risk):** TableReference cache key only uses database and table name, ignoring storage/serde parameters.
Including storage and serde parameters in the cache key will help prevent incorrect cache hits when tables with the same name have different formats.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| HiveMetastore hiveMetastore = new DynamicSchemaHiveMetastore( | ||
| createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize), | ||
| protobufDeserializerFactory, | ||
| Duration.valueOf("1h")); // TODO make configurable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Hardcoded cache expiration duration should be configurable.
Consider exposing the cache expiration setting through HiveConfig or a similar configuration option to support workload-specific tuning.
Suggested implementation:
this.protobufDeserializerFactory = requireNonNull(protobufDeserializerFactory, "protobufDeserializerFactory is null");
this.dynamicSchemaCacheExpiration = requireNonNull(hiveConfig.getDynamicSchemaCacheExpiration(), "dynamicSchemaCacheExpiration is null"); @Override
public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit)
{
HiveMetastore hiveMetastore = new DynamicSchemaHiveMetastore(
createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize),
protobufDeserializerFactory,
dynamicSchemaCacheExpiration);
DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(- In
HiveConfig(likely inplugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java), add a property:private Duration dynamicSchemaCacheExpiration = Duration.valueOf("1h"); @Config("hive.dynamic-schema-cache-expiration") public HiveConfig setDynamicSchemaCacheExpiration(Duration duration) { this.dynamicSchemaCacheExpiration = duration; return this; } public Duration getDynamicSchemaCacheExpiration() { return dynamicSchemaCacheExpiration; }
- Pass
hiveConfigto theHiveMetadataFactoryconstructor if not already done. - Add a field to
HiveMetadataFactory:private final Duration dynamicSchemaCacheExpiration;
- Update any relevant configuration documentation to describe the new property.
| private record TableReference(Table table) | ||
| { | ||
| private String getFullTableName() | ||
| { | ||
| return String.format("%s.%s", table.getDatabaseName(), table.getTableName()); | ||
| } | ||
|
|
||
| private String getSerializationClass() | ||
| { | ||
| String serializationClass = table.getStorage().getSerdeParameters().get(SERIALIZATION_CLASS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): TableReference cache key only uses database and table name, ignoring storage/serde parameters.
Including storage and serde parameters in the cache key will help prevent incorrect cache hits when tables with the same name have different formats.
| HiveMetastore hiveMetastore = new DynamicSchemaHiveMetastore( | ||
| createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize), | ||
| protobufDeserializerFactory, | ||
| Duration.valueOf("1h")); // TODO make configurable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make a proper config property of this TODO when reviewers agree with the rest of the code
059ebf6 to
a60ff5f
Compare
Description
Since Trino 477, the Hive connector has support for reading sequence files containing protobuf bytes; #26353. With that PR, records could be read using the protobuf deserializer.
The code was created for Hive tables that were created using Hive Twitter Elephantbird: https://github.com/twitter/elephant-bird/wiki/How-to-use-Elephant-Bird-with-Hive. In Hive, the tables use the
serialization.classprotobuf file to determine the columns for the table as well, which is named "Dynamic Schemas". An example for such a table in Hive is:When using
show create table X;in Hive, all resolved columns are shown. When the protobuf file is updated with new fields, the columns are automatically added (in Hive). However, in Trino the Hive Metastore is asked for the table columns and I found out that the columns returned by the Hive Metastore (via Trino) were the columns that were determined upon creation of the original table. Later added fields to the protobuf were shown when querying Hive directly, but Trino received the original version via Hive Metastore. Hence this PR.This PR adds the dynamic schema support to Trino. When the storage format SEQUENCEFILE_PROTOBUF is detected, the columns are derived from the protobuf fields, as is done in the Hive implementation.
Release notes
( X ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:
Summary by Sourcery
Enable dynamic schema support for Hive SEQUENCEFILE_PROTOBUF tables by deriving column definitions from Protobuf descriptors instead of relying on static metastore schemas.
New Features:
Enhancements:
Build:
Tests: