Skip to content

Commit dfbb7a7

Browse files
Bug fix: apply time constraints after time offsets
Time constraints were being applied before time offsets in some scenarios. This is incorrect because the values will change during the offset. This fixes that. Customers likely never saw this bug because time constraints are only available to open source users, and this may never have been released to them.
1 parent 0781ab3 commit dfbb7a7

File tree

1 file changed

+28
-30
lines changed

1 file changed

+28
-30
lines changed

metricflow/dataflow/builder/dataflow_plan_builder.py

+28-30
Original file line numberDiff line numberDiff line change
@@ -641,36 +641,39 @@ def _build_derived_metric_output_node(
641641
aggregated_to_elements=set(queried_linkable_specs.as_tuple),
642642
)
643643

644-
# For ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
644+
# For ratio / derived metrics with time offset, apply offset join here. Constraints will be applied after the offset
645+
# to avoid filtering out values that will be changed.
645646
if metric_spec.has_time_offset:
646647
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
647648
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
648649
)
649650
output_node = JoinToTimeSpineNode.create(
650651
parent_node=output_node,
651652
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
652-
time_range_constraint=predicate_pushdown_state.time_range_constraint,
653653
offset_window=metric_spec.offset_window,
654654
offset_to_grain=metric_spec.offset_to_grain,
655655
join_type=SqlJoinType.INNER,
656656
)
657657

658-
if len(metric_spec.filter_spec_set.all_filter_specs) > 0:
659-
output_node = WhereConstraintNode.create(
660-
parent_node=output_node, where_specs=metric_spec.filter_spec_set.all_filter_specs
661-
)
658+
if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or predicate_pushdown_state.time_range_constraint:
659+
# FilterElementsNode will only be needed if there are where filter specs that were selected in the group by.
662660
specs_in_filters = set(
663661
linkable_spec
664662
for filter_spec in metric_spec.filter_spec_set.all_filter_specs
665663
for linkable_spec in filter_spec.linkable_specs
666664
)
665+
filter_to_specs = None
667666
if not specs_in_filters.issubset(queried_linkable_specs.as_tuple):
668-
output_node = FilterElementsNode.create(
669-
parent_node=output_node,
670-
include_specs=InstanceSpecSet(metric_specs=(metric_spec,)).merge(
671-
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
672-
),
667+
filter_to_specs = InstanceSpecSet(metric_specs=(metric_spec,)).merge(
668+
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
673669
)
670+
output_node = self._build_pre_aggregation_plan(
671+
source_node=output_node,
672+
where_filter_specs=metric_spec.filter_spec_set.all_filter_specs,
673+
time_range_constraint=predicate_pushdown_state.time_range_constraint,
674+
filter_to_specs=filter_to_specs,
675+
)
676+
674677
return output_node
675678

676679
def _get_base_agg_time_dimensions(
@@ -1629,7 +1632,6 @@ def _build_aggregated_measure_from_measure_source_node(
16291632
unaggregated_measure_node = JoinToTimeSpineNode.create(
16301633
parent_node=unaggregated_measure_node,
16311634
requested_agg_time_dimension_specs=base_agg_time_dimension_specs,
1632-
time_range_constraint=predicate_pushdown_state.time_range_constraint,
16331635
offset_window=before_aggregation_time_spine_join_description.offset_window,
16341636
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
16351637
join_type=before_aggregation_time_spine_join_description.join_type,
@@ -1641,17 +1643,12 @@ def _build_aggregated_measure_from_measure_source_node(
16411643
# If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity.
16421644
if spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple
16431645
]
1644-
# If time constraint was previously adjusted for cumulative window or grain, apply original time constraint
1645-
# here. Can skip if metric is being aggregated over all time.
1646+
# Apply original time constraint if it wasn't applied to the source node recipe. For cumulative metrics, the constraint
1647+
# may have been expanded and needs to be narrowed here. For offsets, the constraint was deferred to after the offset.
16461648
# TODO - Pushdown: Encapsulate all of this window sliding bookkeeping in the pushdown params object
1647-
time_range_constraint_to_apply = (
1648-
predicate_pushdown_state.time_range_constraint
1649-
if (
1650-
cumulative_metric_adjusted_time_constraint is not None
1651-
and predicate_pushdown_state.time_range_constraint is not None
1652-
)
1653-
else None
1654-
)
1649+
time_range_constraint_to_apply = None
1650+
if cumulative_metric_adjusted_time_constraint or before_aggregation_time_spine_join_description:
1651+
time_range_constraint_to_apply = predicate_pushdown_state.time_range_constraint
16551652
unaggregated_measure_node = self._build_pre_aggregation_plan(
16561653
source_node=unaggregated_measure_node,
16571654
join_targets=measure_recipe.join_targets,
@@ -1737,11 +1734,11 @@ def _build_aggregated_measure_from_measure_source_node(
17371734
def _build_pre_aggregation_plan(
17381735
self,
17391736
source_node: DataflowPlanNode,
1740-
join_targets: List[JoinDescription],
1741-
custom_granularity_specs: Sequence[TimeDimensionSpec],
1742-
where_filter_specs: Sequence[WhereFilterSpec],
1743-
time_range_constraint: Optional[TimeRangeConstraint],
1744-
filter_to_specs: InstanceSpecSet,
1737+
filter_to_specs: Optional[InstanceSpecSet] = None,
1738+
join_targets: List[JoinDescription] = [],
1739+
custom_granularity_specs: Sequence[TimeDimensionSpec] = (),
1740+
where_filter_specs: Sequence[WhereFilterSpec] = (),
1741+
time_range_constraint: Optional[TimeRangeConstraint] = None,
17451742
measure_properties: Optional[MeasureSpecProperties] = None,
17461743
queried_linkable_specs: Optional[LinkableSpecSet] = None,
17471744
distinct: bool = False,
@@ -1775,9 +1772,10 @@ def _build_pre_aggregation_plan(
17751772
queried_linkable_specs=queried_linkable_specs,
17761773
parent_node=output_node,
17771774
)
1778-
output_node = FilterElementsNode.create(
1779-
parent_node=output_node, include_specs=filter_to_specs, distinct=distinct
1780-
)
1775+
if filter_to_specs:
1776+
output_node = FilterElementsNode.create(
1777+
parent_node=output_node, include_specs=filter_to_specs, distinct=distinct
1778+
)
17811779

17821780
return output_node
17831781

0 commit comments

Comments
 (0)