Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -170,7 +171,7 @@ private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Bu

if (needFreshness) {
try {
freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name));
freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name, new MaterializedViewFreshnessCheckPolicy.Exact()));
}
catch (MaterializedViewNotFoundException e) {
// Ignore materialized view that was dropped during query execution (race condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RelationType;
Expand Down Expand Up @@ -851,7 +852,7 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
* Method to get difference between the states of table at two different points in time/or as of given token-ids.
* The method is used by the engine to determine if a materialized view is current with respect to the tables it depends on.
*/
MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name);
MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name, MaterializedViewFreshnessCheckPolicy policy);

/**
* Rename the specified materialized view.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
Expand Down Expand Up @@ -1871,7 +1872,7 @@ public Map<String, Object> getMaterializedViewProperties(Session session, Qualif
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName viewName)
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName viewName, MaterializedViewFreshnessCheckPolicy policy)
{
Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, viewName.catalogName());
if (catalog.isPresent()) {
Expand All @@ -1880,7 +1881,7 @@ public MaterializedViewFreshness getMaterializedViewFreshness(Session session, Q
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle);

ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.getMaterializedViewFreshness(connectorSession, viewName.asSchemaTableName());
return metadata.getMaterializedViewFreshness(connectorSession, viewName.asSchemaTableName(), policy);
}
return new MaterializedViewFreshness(STALE, Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.PointerType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableProcedureMetadata;
Expand Down Expand Up @@ -374,6 +375,7 @@
import static io.trino.spi.StandardErrorCode.VIEW_IS_RECURSIVE;
import static io.trino.spi.StandardErrorCode.VIEW_IS_STALE;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH_WITHIN_GRACE_PERIOD;
import static io.trino.spi.connector.StandardWarningCode.REDUNDANT_ORDER_BY;
import static io.trino.spi.function.FunctionKind.AGGREGATE;
import static io.trino.spi.function.FunctionKind.WINDOW;
Expand Down Expand Up @@ -744,7 +746,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
TableHandle targetTableHandle = metadata.getTableHandle(session, targetTable)
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, refreshMaterializedView, "Table '%s' does not exist", targetTable));

analysis.setSkipMaterializedViewRefresh(metadata.getMaterializedViewFreshness(session, name).getFreshness() == FRESH);
analysis.setSkipMaterializedViewRefresh(metadata.getMaterializedViewFreshness(session, name, new MaterializedViewFreshnessCheckPolicy.Exact()).getFreshness() == FRESH);

TableMetadata tableMetadata = metadata.getTableMetadata(session, targetTableHandle);
List<String> insertColumns = tableMetadata.columns().stream()
Expand Down Expand Up @@ -2377,10 +2379,21 @@ protected Scope visitTable(Table table, Optional<Scope> scope)

private boolean isMaterializedViewSufficientlyFresh(Session session, QualifiedObjectName name, MaterializedViewDefinition materializedViewDefinition)
{
MaterializedViewFreshness materializedViewFreshness = metadata.getMaterializedViewFreshness(session, name);
boolean gracePeriodZero = materializedViewDefinition.getGracePeriod()
.map(Duration::isZero)
.orElse(false);

// TODO should we compare lastFreshTime with session.start() or with current time? The freshness is calculated with respect to current state of things.
Instant freshnessReferenceTime = sessionTimeProvider.getStart(session);
// Use Exact policy when grace period is zero to handle this special case in the analyzer
// rather than duplicating this logic across all connectors
MaterializedViewFreshnessCheckPolicy policy = gracePeriodZero
? new MaterializedViewFreshnessCheckPolicy.Exact()
: new MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod(freshnessReferenceTime);
MaterializedViewFreshness materializedViewFreshness = metadata.getMaterializedViewFreshness(session, name, policy);
MaterializedViewFreshness.Freshness freshness = materializedViewFreshness.getFreshness();

if (freshness == FRESH) {
if (freshness == FRESH || freshness == FRESH_WITHIN_GRACE_PERIOD) {
return true;
}
Optional<Instant> lastFreshTime = materializedViewFreshness.getLastFreshTime();
Expand All @@ -2392,16 +2405,15 @@ private boolean isMaterializedViewSufficientlyFresh(Session session, QualifiedOb
// Unlimited grace period
return true;
}
Duration gracePeriod = materializedViewDefinition.getGracePeriod().get();
if (gracePeriod.isZero()) {
if (gracePeriodZero) {
// Consider 0 as a special value meaning "do not accept any staleness". This makes 0 more reliable, and more likely what user wanted,
// regardless of lastFreshTime, query time or rounding.
return false;
}

Duration gracePeriod = materializedViewDefinition.getGracePeriod().get();
// Can be negative
// TODO should we compare lastFreshTime with session.start() or with current time? The freshness is calculated with respect to current state of things.
Duration staleness = Duration.between(lastFreshTime.get(), sessionTimeProvider.getStart(session));
Duration staleness = Duration.between(lastFreshTime.get(), freshnessReferenceTime);
return staleness.compareTo(gracePeriod) <= 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SaveMode;
Expand Down Expand Up @@ -301,7 +302,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name)
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy)
{
boolean fresh = freshMaterializedViews.contains(name);
return new MaterializedViewFreshness(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
Expand Down Expand Up @@ -1405,11 +1406,11 @@ public Map<String, Object> getMaterializedViewProperties(ConnectorSession sessio
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name)
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy)
{
Span span = startSpan("getMaterializedViewFreshness", name);
try (var _ = scopedSpan(span)) {
return delegate.getMaterializedViewFreshness(session, name);
return delegate.getMaterializedViewFreshness(session, name, policy);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RelationType;
Expand Down Expand Up @@ -1558,11 +1559,11 @@ public Map<String, Object> getMaterializedViewProperties(Session session, Qualif
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name)
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name, MaterializedViewFreshnessCheckPolicy policy)
{
Span span = startSpan("getMaterializedViewFreshness", name);
try (var _ = scopedSpan(span)) {
return delegate.getMaterializedViewFreshness(session, name);
return delegate.getMaterializedViewFreshness(session, name, policy);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RecordPageSource;
import io.trino.spi.connector.RelationColumnsMetadata;
Expand Down Expand Up @@ -747,7 +748,7 @@ public Map<String, Object> getMaterializedViewProperties(ConnectorSession sessio
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName viewName)
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName viewName, MaterializedViewFreshnessCheckPolicy policy)
{
if (getMaterializedViewFreshness.isPresent()) {
MaterializedViewFreshness freshness = getMaterializedViewFreshness.get().apply(session, viewName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RelationType;
Expand Down Expand Up @@ -1051,7 +1052,7 @@ public Map<String, Object> getMaterializedViewProperties(Session session, Qualif
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name)
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name, MaterializedViewFreshnessCheckPolicy policy)
{
throw new UnsupportedOperationException();
}
Expand Down
5 changes: 5 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@
<old>method void io.trino.spi.resourcegroups.SelectionCriteria::&lt;init&gt;(boolean, java.lang.String, java.util.Set&lt;java.lang.String&gt;, java.lang.String, java.util.Optional&lt;java.lang.String&gt;, java.util.Optional&lt;java.lang.String&gt;, java.util.Set&lt;java.lang.String&gt;, io.trino.spi.session.ResourceEstimates, java.util.Optional&lt;java.lang.String&gt;)</old>
<justification>Added queryText before queryType parameter</justification>
</item>
<item>
<code>java.method.numberOfParametersChanged</code>
<old>method io.trino.spi.connector.MaterializedViewFreshness io.trino.spi.connector.ConnectorMetadata::getMaterializedViewFreshness(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.SchemaTableName)</old>
<new>method io.trino.spi.connector.MaterializedViewFreshness io.trino.spi.connector.ConnectorMetadata::getMaterializedViewFreshness(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.SchemaTableName, io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy)</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ default Map<String, Object> getMaterializedViewProperties(ConnectorSession sessi
*
* @throws MaterializedViewNotFoundException when materialized view is not found
*/
default MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name)
default MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy)
{
throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata getMaterializedView() is implemented without getMaterializedViewFreshness()");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ public String toString()
public enum Freshness
{
FRESH,
/**
* The materialized view is fresh enough within its grace period. Returned when a connector
* received a {@link MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod} policy and
* determined it could skip expensive freshness checks (e.g., checking base table snapshots)
* because the MV is still within the grace period bounds. The actual freshness of base tables
* was not verified, but worst-case staleness is guaranteed to be within the acceptable grace
* period.
*/
FRESH_WITHIN_GRACE_PERIOD,
STALE,
UNKNOWN,
/**/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.spi.connector;

import java.time.Instant;

/**
* Policy for checking materialized view freshness, guiding a connector on whether it can
* optimize freshness checks.
*/
public sealed interface MaterializedViewFreshnessCheckPolicy
permits MaterializedViewFreshnessCheckPolicy.Exact, MaterializedViewFreshnessCheckPolicy.ConsiderGracePeriod
{
/**
* Policy requiring exact freshness check.
*/
record Exact()
implements MaterializedViewFreshnessCheckPolicy {}

/**
* Policy that allows optimizing freshness checks by considering the materialized view's
* grace period. A connector may skip expensive operations (e.g., metastore calls to check
* base table snapshots) if it can determine the MV is fresh enough based on the
* referenceTime and the grace period configured in the materialized view definition.
*
* @param referenceTime The point in time against which freshness is being evaluated
* (typically the query start time)
*/
record ConsiderGracePeriod(Instant referenceTime)
implements MaterializedViewFreshnessCheckPolicy {}
}
Comment on lines +19 to +43
Copy link
Member

@findepi findepi Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's beautiful, but mouthful and makes following the logic harder.
I wanted this to be single boolean param like boolean considerGracePeriod...

It seems this SPI design was forced by the fact we don't want the connector to use session.getStart(), and we don't want the connector to use that value because... it's mocked in tests.
So, SPI shape (and complexity) is a by product of existing tests. Not great.
Let's find a way to avoid that.

For example, we could pass SessionTimeProvider to io.trino.Session#toConnectorSession so that FullConnectorSession can have its reported start time overriden.
Or, perhaps we could fix this to work (in tests only)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were two reasons:

  • More important: I wanted to avoid making the connector aware of what the reference time is. Even in the analyzer we have a TODO (which we probably should remove) that suggests we aren’t sure whether it should be the current time or the session start time.
  • The second reason you almost figured out: I initially wanted to use the current time in the connector as the safest, most pessimistic option. But the already-mocked session start time stopped me…

Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewFreshnessCheckPolicy;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
Expand Down Expand Up @@ -1220,10 +1221,10 @@ public Map<String, Object> getMaterializedViewProperties(ConnectorSession sessio
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name)
public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name, MaterializedViewFreshnessCheckPolicy policy)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.getMaterializedViewFreshness(session, name);
return delegate.getMaterializedViewFreshness(session, name, policy);
}
}

Expand Down
Loading