diff --git a/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java b/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java index af7b2f3da62e..ffab22476e6e 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java @@ -30,6 +30,7 @@ import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.InMemoryRecordSet; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.MaterializedViewNotFoundException; import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.SchemaTableName; @@ -170,7 +171,7 @@ private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Bu if (needFreshness) { try { - freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name)); + freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name, new MaterializedViewFreshnessCheckPolicy.Exact())); } catch (MaterializedViewNotFoundException e) { // Ignore materialized view that was dropped during query execution (race condition) diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index a62979dfd513..04ebaac5cdc0 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -42,6 +42,7 @@ import io.trino.spi.connector.JoinType; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RelationType; @@ -851,7 +852,7 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName * Method to get difference between the states of table at two different points in time/or as of given token-ids. * The method is used by the engine to determine if a materialized view is current with respect to the tables it depends on. */ - MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name); + MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name, MaterializedViewFreshnessCheckPolicy policy); /** * Rename the specified materialized view. diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index d71098152ce3..310ec3acb7fe 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -73,6 +73,7 @@ import io.trino.spi.connector.JoinType; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -1871,7 +1872,7 @@ public Map getMaterializedViewProperties(Session session, Qualif } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName viewName) + public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName viewName, MaterializedViewFreshnessCheckPolicy policy) { Optional catalog = getOptionalCatalogMetadata(session, viewName.catalogName()); if (catalog.isPresent()) { @@ -1880,7 +1881,7 @@ public MaterializedViewFreshness getMaterializedViewFreshness(Session session, Q ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle); ConnectorSession connectorSession = session.toConnectorSession(catalogHandle); - return metadata.getMaterializedViewFreshness(connectorSession, viewName.asSchemaTableName()); + return metadata.getMaterializedViewFreshness(connectorSession, viewName.asSchemaTableName(), policy); } return new MaterializedViewFreshness(STALE, Optional.empty()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 661ada68965e..6bb3d23acc15 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -69,6 +69,7 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.PointerType; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableProcedureMetadata; @@ -374,6 +375,7 @@ import static io.trino.spi.StandardErrorCode.VIEW_IS_RECURSIVE; import static io.trino.spi.StandardErrorCode.VIEW_IS_STALE; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH; +import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH_WITHIN_GRACE_PERIOD; import static io.trino.spi.connector.StandardWarningCode.REDUNDANT_ORDER_BY; import static io.trino.spi.function.FunctionKind.AGGREGATE; import static io.trino.spi.function.FunctionKind.WINDOW; @@ -744,7 +746,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate TableHandle targetTableHandle = metadata.getTableHandle(session, targetTable) .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, refreshMaterializedView, "Table '%s' does not exist", targetTable)); - analysis.setSkipMaterializedViewRefresh(metadata.getMaterializedViewFreshness(session, name).getFreshness() == FRESH); + analysis.setSkipMaterializedViewRefresh(metadata.getMaterializedViewFreshness(session, name, new MaterializedViewFreshnessCheckPolicy.Exact()).getFreshness() == FRESH); TableMetadata tableMetadata = metadata.getTableMetadata(session, targetTableHandle); List insertColumns = tableMetadata.columns().stream() @@ -2377,10 +2379,21 @@ protected Scope visitTable(Table table, Optional scope) private boolean isMaterializedViewSufficientlyFresh(Session session, QualifiedObjectName name, MaterializedViewDefinition materializedViewDefinition) { - MaterializedViewFreshness materializedViewFreshness = metadata.getMaterializedViewFreshness(session, name); + boolean gracePeriodZero = materializedViewDefinition.getGracePeriod() + .map(Duration::isZero) + .orElse(false); + + // TODO should we compare lastFreshTime with session.start() or with current time? The freshness is calculated with respect to current state of things. + Instant freshnessReferenceTime = sessionTimeProvider.getStart(session); + // Use Exact policy when grace period is zero to handle this special case in the analyzer + // rather than duplicating this logic across all connectors + MaterializedViewFreshnessCheckPolicy policy = gracePeriodZero + ? new MaterializedViewFreshnessCheckPolicy.Exact() + : new MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod(freshnessReferenceTime); + MaterializedViewFreshness materializedViewFreshness = metadata.getMaterializedViewFreshness(session, name, policy); MaterializedViewFreshness.Freshness freshness = materializedViewFreshness.getFreshness(); - if (freshness == FRESH) { + if (freshness == FRESH || freshness == FRESH_WITHIN_GRACE_PERIOD) { return true; } Optional lastFreshTime = materializedViewFreshness.getLastFreshTime(); @@ -2392,16 +2405,15 @@ private boolean isMaterializedViewSufficientlyFresh(Session session, QualifiedOb // Unlimited grace period return true; } - Duration gracePeriod = materializedViewDefinition.getGracePeriod().get(); - if (gracePeriod.isZero()) { + if (gracePeriodZero) { // Consider 0 as a special value meaning "do not accept any staleness". This makes 0 more reliable, and more likely what user wanted, // regardless of lastFreshTime, query time or rounding. return false; } + Duration gracePeriod = materializedViewDefinition.getGracePeriod().get(); // Can be negative - // TODO should we compare lastFreshTime with session.start() or with current time? The freshness is calculated with respect to current state of things. - Duration staleness = Duration.between(lastFreshTime.get(), sessionTimeProvider.getStart(session)); + Duration staleness = Duration.between(lastFreshTime.get(), freshnessReferenceTime); return staleness.compareTo(gracePeriod) <= 0; } diff --git a/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java b/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java index b0377ee376cd..5bf8ec792d04 100644 --- a/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java @@ -37,6 +37,7 @@ import io.trino.spi.connector.ConnectorTableVersion; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.MaterializedViewNotFoundException; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SaveMode; @@ -301,7 +302,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name) + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy) { boolean fresh = freshMaterializedViews.contains(name); return new MaterializedViewFreshness( diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index f39d1ad27f3d..746304cb5433 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -50,6 +50,7 @@ import io.trino.spi.connector.JoinType; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -1405,11 +1406,11 @@ public Map getMaterializedViewProperties(ConnectorSession sessio } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name) + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy) { Span span = startSpan("getMaterializedViewFreshness", name); try (var _ = scopedSpan(span)) { - return delegate.getMaterializedViewFreshness(session, name); + return delegate.getMaterializedViewFreshness(session, name, policy); } } diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index b5182a6fd952..4c0125792057 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -72,6 +72,7 @@ import io.trino.spi.connector.JoinType; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RelationType; @@ -1558,11 +1559,11 @@ public Map getMaterializedViewProperties(Session session, Qualif } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name) + public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name, MaterializedViewFreshnessCheckPolicy policy) { Span span = startSpan("getMaterializedViewFreshness", name); try (var _ = scopedSpan(span)) { - return delegate.getMaterializedViewFreshness(session, name); + return delegate.getMaterializedViewFreshness(session, name, policy); } } diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 2a70c0a7523d..31e6e015ad37 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -71,6 +71,7 @@ import io.trino.spi.connector.JoinStatistics; import io.trino.spi.connector.JoinType; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RecordPageSource; import io.trino.spi.connector.RelationColumnsMetadata; @@ -747,7 +748,7 @@ public Map getMaterializedViewProperties(ConnectorSession sessio } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName viewName) + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName viewName, MaterializedViewFreshnessCheckPolicy policy) { if (getMaterializedViewFreshness.isPresent()) { MaterializedViewFreshness freshness = getMaterializedViewFreshness.get().apply(session, viewName); diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index f395ade048f3..9f7460b0608d 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -46,6 +46,7 @@ import io.trino.spi.connector.JoinType; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RelationType; @@ -1051,7 +1052,7 @@ public Map getMaterializedViewProperties(Session session, Qualif } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name) + public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name, MaterializedViewFreshnessCheckPolicy policy) { throw new UnsupportedOperationException(); } diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 17ff12c4049f..bf54c4741635 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -315,6 +315,11 @@ method void io.trino.spi.resourcegroups.SelectionCriteria::<init>(boolean, java.lang.String, java.util.Set<java.lang.String>, java.lang.String, java.util.Optional<java.lang.String>, java.util.Optional<java.lang.String>, java.util.Set<java.lang.String>, io.trino.spi.session.ResourceEstimates, java.util.Optional<java.lang.String>) Added queryText before queryType parameter + + java.method.numberOfParametersChanged + method io.trino.spi.connector.MaterializedViewFreshness io.trino.spi.connector.ConnectorMetadata::getMaterializedViewFreshness(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.SchemaTableName) + method io.trino.spi.connector.MaterializedViewFreshness io.trino.spi.connector.ConnectorMetadata::getMaterializedViewFreshness(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.SchemaTableName, io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy) + diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 9cb5265fe390..ca3cd1e6e4f1 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -1757,7 +1757,7 @@ default Map getMaterializedViewProperties(ConnectorSession sessi * * @throws MaterializedViewNotFoundException when materialized view is not found */ - default MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name) + default MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata getMaterializedView() is implemented without getMaterializedViewFreshness()"); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshness.java b/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshness.java index 0cd3c986352e..a78d2754cecb 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshness.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshness.java @@ -76,6 +76,15 @@ public String toString() public enum Freshness { FRESH, + /** + * The materialized view is fresh enough within its grace period. Returned when a connector + * received a {@link MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod} policy and + * determined it could skip expensive freshness checks (e.g., checking base table snapshots) + * because the MV is still within the grace period bounds. The actual freshness of base tables + * was not verified, but worst-case staleness is guaranteed to be within the acceptable grace + * period. + */ + FRESH_WITHIN_GRACE_PERIOD, STALE, UNKNOWN, /**/ diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshnessCheckPolicy.java b/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshnessCheckPolicy.java new file mode 100644 index 000000000000..96d2fb9951bf --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/MaterializedViewFreshnessCheckPolicy.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.spi.connector; + +import java.time.Instant; + +/** + * Policy for checking materialized view freshness, guiding a connector on whether it can + * optimize freshness checks. + */ +public sealed interface MaterializedViewFreshnessCheckPolicy + permits MaterializedViewFreshnessCheckPolicy.Exact, MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod +{ + /** + * Policy requiring exact freshness check. + */ + record Exact() + implements MaterializedViewFreshnessCheckPolicy {} + + /** + * Policy that allows optimizing freshness checks by considering the materialized view's + * grace period. A connector may skip expensive operations (e.g., metastore calls to check + * base table snapshots) if it can determine the MV is fresh enough based on the + * referenceTime and the grace period configured in the materialized view definition. + * + * @param referenceTime The point in time against which freshness is being evaluated + * (typically the query start time) + */ + record ConsiderGracePeriod(Instant referenceTime) + implements MaterializedViewFreshnessCheckPolicy {} +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 44fbec680d51..82a4781e5329 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -50,6 +50,7 @@ import io.trino.spi.connector.JoinType; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -1220,10 +1221,10 @@ public Map getMaterializedViewProperties(ConnectorSession sessio } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name) + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy) { try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { - return delegate.getMaterializedViewFreshness(session, name); + return delegate.getMaterializedViewFreshness(session, name, policy); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 0b9bf4bd22c1..8c2b873b3b72 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -104,6 +104,8 @@ import io.trino.spi.connector.DiscretePredicates; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -375,6 +377,7 @@ import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH; +import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH_WITHIN_GRACE_PERIOD; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN; import static io.trino.spi.connector.RetryMode.NO_RETRIES; @@ -3857,7 +3860,7 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName materializedViewName) + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName materializedViewName, MaterializedViewFreshnessCheckPolicy policy) { Optional materializedViewDefinition = getMaterializedView(session, materializedViewName); if (materializedViewDefinition.isEmpty()) { @@ -3878,9 +3881,11 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s .map(snapshot -> Boolean.valueOf(snapshot.summary().getOrDefault(DEPENDS_ON_TABLE_FUNCTIONS, "false"))) .orElse(false); - Optional refreshTime = currentSnapshot.map(snapshot -> snapshot.summary().get(TRINO_QUERY_START_TIME)) - .map(Instant::parse) - .or(() -> currentSnapshot.map(snapshot -> Instant.ofEpochMilli(snapshot.timestampMillis()))); + // refreshStartTime is captured at the beginning of refresh + Optional refreshStartTime = currentSnapshot.map(snapshot -> snapshot.summary().get(TRINO_QUERY_START_TIME)) + .map(Instant::parse); + // For MVs refreshed before TRINO_QUERY_START_TIME was introduced, fall back to snapshot timestamp (captured at end of refresh) + Optional refreshTime = refreshStartTime.or(() -> currentSnapshot.map(snapshot -> Instant.ofEpochMilli(snapshot.timestampMillis()))); if (dependsOnTableFunctions) { // It can't be determined whether a value returned by table function is STALE or not @@ -3893,6 +3898,27 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s return new MaterializedViewFreshness(STALE, Optional.empty()); } + if (policy instanceof ConsiderGracePeriod(Instant referenceTime) && refreshStartTime.isPresent()) { + // To determine freshness, we normally load current metadata for each base table and check if there + // is a newer snapshot than the recorded one (DEPENDS_ON_TABLES). This requires expensive metastore + // operations for each base Iceberg table. + // + // The refresh query can read base table snapshots created before or during its execution. In the most + // pessimistic scenario, a new base table snapshot is created immediately after the refresh started + // (at refreshStartTime + epsilon), but the refresh reads an older snapshot. This new snapshot would + // not be recorded in DEPENDS_ON_TABLES, making the MV technically stale. However, when ConsiderGracePeriod + // policy is used and refreshStartTime + gracePeriod > referenceTime, we can safely say that the MV + // is at least within the grace period because refreshStartTime is before the new snapshot creation time. + Optional gracePeriod = materializedViewDefinition.get().getGracePeriod(); + if (gracePeriod.isEmpty()) { + // infinite grace period + return new MaterializedViewFreshness(FRESH_WITHIN_GRACE_PERIOD, Optional.empty()); + } + else if (refreshStartTime.get().plus(gracePeriod.get()).isAfter(referenceTime)) { + return new MaterializedViewFreshness(FRESH_WITHIN_GRACE_PERIOD, Optional.empty()); + } + } + boolean hasUnknownTables = false; Optional firstTableChange = Optional.of(Long.MAX_VALUE); ImmutableList.Builder> tableChangeInfoTasks = ImmutableList.builder(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 8a0574e160f2..06cef856fbc4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -219,7 +219,7 @@ public void testSelectFromFreshMaterializedView() assertMetastoreInvocations("SELECT * FROM test_select_fresh_mview_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 1) .build()); } @@ -233,7 +233,7 @@ public void testSelectFromMaterializedViewWithinGracePeriod() assertMetastoreInvocations("SELECT * FROM test_select_gp_mview_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 1) .build()); } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java index 29ae3bfacac0..1e48848d4904 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java @@ -68,6 +68,7 @@ import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -939,9 +940,9 @@ public Map getMaterializedViewProperties(ConnectorSession sessio } @Override - public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name) + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy) { - return icebergMetadata.getMaterializedViewFreshness(session, name); + return icebergMetadata.getMaterializedViewFreshness(session, name, policy); } @Override