Skip to content

Commit

Permalink
Simplify JoinToTimeSpineNode (#1542)
Browse files Browse the repository at this point in the history
A bunch of cleanup in the dataflow to SQL logic for the
`JoinToTimeSpineNode`. There was a lot of duplication and hard to read
code here.

Notes:
- Please review by commit. The more complex commits have extended commit
descriptions to help explain the changes.
- There are many SQL changes here but none of them functional changes.
The changes to optimized snapshots might be best to focus on.
  • Loading branch information
courtneyholcomb authored Dec 9, 2024
1 parent 385d3cf commit 7a00c2f
Show file tree
Hide file tree
Showing 303 changed files with 869 additions and 874 deletions.
3 changes: 0 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,6 @@ def _build_derived_metric_output_node(
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
Expand Down Expand Up @@ -1646,7 +1645,6 @@ def _build_aggregated_measure_from_measure_source_node(
unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
requested_agg_time_dimension_specs=base_agg_time_dimension_specs,
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
Expand Down Expand Up @@ -1721,7 +1719,6 @@ def _build_aggregated_measure_from_measure_source_node(
output_node: DataflowPlanNode = JoinToTimeSpineNode.create(
parent_node=aggregate_measures_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
use_custom_agg_time_dimension=not queried_linkable_specs.contains_metric_time,
join_type=after_aggregation_time_spine_join_description.join_type,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=after_aggregation_time_spine_join_description.offset_window,
Expand Down
7 changes: 0 additions & 7 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
Attributes:
requested_agg_time_dimension_specs: Time dimensions requested in the query.
use_custom_agg_time_dimension: Indicates if agg_time_dimension should be used in join. If false, uses metric_time.
join_type: Join type to use when joining to time spine.
time_range_constraint: Time range to constrain the time spine to.
offset_window: Time window to offset the parent dataset by when joining to time spine.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
"""

requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
use_custom_agg_time_dimension: bool
join_type: SqlJoinType
time_range_constraint: Optional[TimeRangeConstraint]
offset_window: Optional[MetricTimeWindow]
Expand All @@ -54,7 +52,6 @@ def __post_init__(self) -> None: # noqa: D105
def create( # noqa: D102
parent_node: DataflowPlanNode,
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
use_custom_agg_time_dimension: bool,
join_type: SqlJoinType,
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
Expand All @@ -64,7 +61,6 @@ def create( # noqa: D102
return JoinToTimeSpineNode(
parent_nodes=(parent_node,),
requested_agg_time_dimension_specs=tuple(requested_agg_time_dimension_specs),
use_custom_agg_time_dimension=use_custom_agg_time_dimension,
join_type=join_type,
time_range_constraint=time_range_constraint,
offset_window=offset_window,
Expand All @@ -87,7 +83,6 @@ def description(self) -> str: # noqa: D102
def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
props = tuple(super().displayed_properties) + (
DisplayedProperty("requested_agg_time_dimension_specs", self.requested_agg_time_dimension_specs),
DisplayedProperty("use_custom_agg_time_dimension", self.use_custom_agg_time_dimension),
DisplayedProperty("join_type", self.join_type),
)
if self.offset_window:
Expand Down Expand Up @@ -115,7 +110,6 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
and other_node.offset_window == self.offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.use_custom_agg_time_dimension == self.use_custom_agg_time_dimension
and other_node.join_type == self.join_type
and other_node.time_spine_filters == self.time_spine_filters
)
Expand All @@ -125,7 +119,6 @@ def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> Join
return JoinToTimeSpineNode.create(
parent_node=new_parent_nodes[0],
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
use_custom_agg_time_dimension=self.use_custom_agg_time_dimension,
time_range_constraint=self.time_range_constraint,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
Expand Down
Loading

0 comments on commit 7a00c2f

Please sign in to comment.