Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in JoinToTimeSpine dataflow plans #1577

Merged
merged 3 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def _build_derived_metric_output_node(
# TODO: move this to a helper method
time_spine_node = self._build_time_spine_node(queried_agg_time_dimension_specs)
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
metric_source_node=output_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0],
Expand Down Expand Up @@ -1651,7 +1651,7 @@ def _build_aggregated_measure_from_measure_source_node(
required_time_spine_specs = (join_on_time_dimension_spec,) + base_queried_agg_time_dimension_specs
time_spine_node = self._build_time_spine_node(required_time_spine_specs)
unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
metric_source_node=unaggregated_measure_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs,
join_on_time_dimension_spec=join_on_time_dimension_spec,
Expand Down Expand Up @@ -1725,7 +1725,7 @@ def _build_aggregated_measure_from_measure_source_node(
where_filter_specs=agg_time_only_filters,
)
output_node: DataflowPlanNode = JoinToTimeSpineNode.create(
parent_node=aggregate_measures_node,
metric_source_node=aggregate_measures_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0],
Expand Down
13 changes: 5 additions & 8 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
"""

time_spine_node: DataflowPlanNode
metric_source_node: DataflowPlanNode
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
join_on_time_dimension_spec: TimeDimensionSpec
join_type: SqlJoinType
Expand All @@ -37,7 +38,6 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()
assert len(self.parent_nodes) == 1

assert not (
self.offset_window and self.offset_to_grain
Expand All @@ -48,7 +48,7 @@ def __post_init__(self) -> None: # noqa: D105

@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode,
metric_source_node: DataflowPlanNode,
time_spine_node: DataflowPlanNode,
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
join_on_time_dimension_spec: TimeDimensionSpec,
Expand All @@ -57,7 +57,8 @@ def create( # noqa: D102
offset_to_grain: Optional[TimeGranularity] = None,
) -> JoinToTimeSpineNode:
return JoinToTimeSpineNode(
parent_nodes=(parent_node,),
parent_nodes=(metric_source_node, time_spine_node),
metric_source_node=metric_source_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=tuple(requested_agg_time_dimension_specs),
join_on_time_dimension_spec=join_on_time_dimension_spec,
Expand Down Expand Up @@ -90,10 +91,6 @@ def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
props += (DisplayedProperty("offset_to_grain", self.offset_to_grain),)
return props

@property
def parent_node(self) -> DataflowPlanNode: # noqa: D102
return self.parent_nodes[0]

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
Expand All @@ -107,7 +104,7 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> JoinToTimeSpineNode: # noqa: D102
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode.create(
parent_node=new_parent_nodes[0],
metric_source_node=self.metric_source_node,
time_spine_node=self.time_spine_node,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
offset_window=self.offset_window,
Expand Down
2 changes: 1 addition & 1 deletion metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@ def _choose_instance_for_time_spine_join(
return agg_time_dimension_instances[0]

def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet: # noqa: D102
parent_data_set = node.parent_node.accept(self)
parent_data_set = node.metric_source_node.accept(self)
parent_alias = self._next_unique_table_alias()
time_spine_data_set = node.time_spine_node.accept(self)
time_spine_alias = self._next_unique_table_alias()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def test_aggregate_output_join_metric_predicate_pushdown(
)


@pytest.mark.skip("Predicate pushdown is not implemented for some of the nodes in this plan")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were these passing before this change 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! The optimizers weren't correctly going through all the parent nodes, so this was never hitting the AliasSpecsNode. That node is new and doesn't have an implementation for the pushdown optimizer.

def test_offset_metric_predicate_pushdown(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
Expand Down Expand Up @@ -354,6 +355,7 @@ def test_offset_metric_predicate_pushdown(
)


@pytest.mark.skip("Predicate pushdown is not implemented for some of the nodes in this plan")
def test_fill_nulls_time_spine_metric_predicate_pushdown(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
Expand Down Expand Up @@ -382,6 +384,7 @@ def test_fill_nulls_time_spine_metric_predicate_pushdown(
)


@pytest.mark.skip("Predicate pushdown is not implemented for some of the nodes in this plan")
def test_fill_nulls_time_spine_metric_with_post_agg_join_predicate_pushdown(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ WITH sma_28019_cte AS (
FROM ***************************.fct_visits visits_source_src_28000
)

, rss_28018_cte AS (
-- Read From Time Spine 'mf_time_spine'
SELECT
ds AS ds__day
FROM ***************************.mf_time_spine time_spine_src_28006
)

SELECT
metric_time__day AS metric_time__day
, CAST(buys AS FLOAT64) / CAST(NULLIF(visits, 0) AS FLOAT64) AS visit_buy_conversion_rate_7days_fill_nulls_with_0
Expand All @@ -27,9 +34,9 @@ FROM (
FROM (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_26.visits AS visits
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits', 'metric_time__day']
Expand All @@ -42,14 +49,14 @@ FROM (
metric_time__day
) subq_26
ON
time_spine_src_28006.ds = subq_26.metric_time__day
rss_28018_cte.ds__day = subq_26.metric_time__day
) subq_30
FULL OUTER JOIN (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_39.buys AS buys
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Find conversions for user within the range of 7 day
-- Pass Only Elements: ['buys', 'metric_time__day']
Expand Down Expand Up @@ -113,7 +120,7 @@ FROM (
metric_time__day
) subq_39
ON
time_spine_src_28006.ds = subq_39.metric_time__day
rss_28018_cte.ds__day = subq_39.metric_time__day
) subq_43
ON
subq_30.metric_time__day = subq_43.metric_time__day
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ WITH sma_28019_cte AS (
FROM ***************************.fct_visits visits_source_src_28000
)

, rss_28018_cte AS (
-- Read From Time Spine 'mf_time_spine'
SELECT
ds AS ds__day
FROM ***************************.mf_time_spine time_spine_src_28006
)

SELECT
metric_time__day AS metric_time__day
, CAST(buys AS DOUBLE) / CAST(NULLIF(visits, 0) AS DOUBLE) AS visit_buy_conversion_rate_7days_fill_nulls_with_0
Expand All @@ -27,9 +34,9 @@ FROM (
FROM (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_26.visits AS visits
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits', 'metric_time__day']
Expand All @@ -42,14 +49,14 @@ FROM (
metric_time__day
) subq_26
ON
time_spine_src_28006.ds = subq_26.metric_time__day
rss_28018_cte.ds__day = subq_26.metric_time__day
) subq_30
FULL OUTER JOIN (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_39.buys AS buys
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Find conversions for user within the range of 7 day
-- Pass Only Elements: ['buys', 'metric_time__day']
Expand Down Expand Up @@ -113,7 +120,7 @@ FROM (
metric_time__day
) subq_39
ON
time_spine_src_28006.ds = subq_39.metric_time__day
rss_28018_cte.ds__day = subq_39.metric_time__day
) subq_43
ON
subq_30.metric_time__day = subq_43.metric_time__day
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ WITH sma_28019_cte AS (
FROM ***************************.fct_visits visits_source_src_28000
)

, rss_28018_cte AS (
-- Read From Time Spine 'mf_time_spine'
SELECT
ds AS ds__day
FROM ***************************.mf_time_spine time_spine_src_28006
)

SELECT
metric_time__day AS metric_time__day
, CAST(buys AS DOUBLE) / CAST(NULLIF(visits, 0) AS DOUBLE) AS visit_buy_conversion_rate_7days_fill_nulls_with_0
Expand All @@ -27,9 +34,9 @@ FROM (
FROM (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_26.visits AS visits
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits', 'metric_time__day']
Expand All @@ -42,14 +49,14 @@ FROM (
metric_time__day
) subq_26
ON
time_spine_src_28006.ds = subq_26.metric_time__day
rss_28018_cte.ds__day = subq_26.metric_time__day
) subq_30
FULL OUTER JOIN (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_39.buys AS buys
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Find conversions for user within the range of 7 day
-- Pass Only Elements: ['buys', 'metric_time__day']
Expand Down Expand Up @@ -113,7 +120,7 @@ FROM (
metric_time__day
) subq_39
ON
time_spine_src_28006.ds = subq_39.metric_time__day
rss_28018_cte.ds__day = subq_39.metric_time__day
) subq_43
ON
subq_30.metric_time__day = subq_43.metric_time__day
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ WITH sma_28019_cte AS (
FROM ***************************.fct_visits visits_source_src_28000
)

, rss_28018_cte AS (
-- Read From Time Spine 'mf_time_spine'
SELECT
ds AS ds__day
FROM ***************************.mf_time_spine time_spine_src_28006
)

SELECT
metric_time__day AS metric_time__day
, CAST(buys AS DOUBLE PRECISION) / CAST(NULLIF(visits, 0) AS DOUBLE PRECISION) AS visit_buy_conversion_rate_7days_fill_nulls_with_0
Expand All @@ -27,9 +34,9 @@ FROM (
FROM (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_26.visits AS visits
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits', 'metric_time__day']
Expand All @@ -42,14 +49,14 @@ FROM (
metric_time__day
) subq_26
ON
time_spine_src_28006.ds = subq_26.metric_time__day
rss_28018_cte.ds__day = subq_26.metric_time__day
) subq_30
FULL OUTER JOIN (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_39.buys AS buys
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Find conversions for user within the range of 7 day
-- Pass Only Elements: ['buys', 'metric_time__day']
Expand Down Expand Up @@ -113,7 +120,7 @@ FROM (
metric_time__day
) subq_39
ON
time_spine_src_28006.ds = subq_39.metric_time__day
rss_28018_cte.ds__day = subq_39.metric_time__day
) subq_43
ON
subq_30.metric_time__day = subq_43.metric_time__day
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ WITH sma_28019_cte AS (
FROM ***************************.fct_visits visits_source_src_28000
)

, rss_28018_cte AS (
-- Read From Time Spine 'mf_time_spine'
SELECT
ds AS ds__day
FROM ***************************.mf_time_spine time_spine_src_28006
)

SELECT
metric_time__day AS metric_time__day
, CAST(buys AS DOUBLE PRECISION) / CAST(NULLIF(visits, 0) AS DOUBLE PRECISION) AS visit_buy_conversion_rate_7days_fill_nulls_with_0
Expand All @@ -27,9 +34,9 @@ FROM (
FROM (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_26.visits AS visits
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits', 'metric_time__day']
Expand All @@ -42,14 +49,14 @@ FROM (
metric_time__day
) subq_26
ON
time_spine_src_28006.ds = subq_26.metric_time__day
rss_28018_cte.ds__day = subq_26.metric_time__day
) subq_30
FULL OUTER JOIN (
-- Join to Time Spine Dataset
SELECT
time_spine_src_28006.ds AS metric_time__day
rss_28018_cte.ds__day AS metric_time__day
, subq_39.buys AS buys
FROM ***************************.mf_time_spine time_spine_src_28006
FROM rss_28018_cte rss_28018_cte
LEFT OUTER JOIN (
-- Find conversions for user within the range of 7 day
-- Pass Only Elements: ['buys', 'metric_time__day']
Expand Down Expand Up @@ -113,7 +120,7 @@ FROM (
metric_time__day
) subq_39
ON
time_spine_src_28006.ds = subq_39.metric_time__day
rss_28018_cte.ds__day = subq_39.metric_time__day
) subq_43
ON
subq_30.metric_time__day = subq_43.metric_time__day
Expand Down
Loading
Loading