Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable granularities for derived metrics with time offset #726

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230815-181153.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Update dataflow plan to support different granularities with time offset metrics
time: 2023-08-15T18:11:53.376909-07:00
custom:
Author: courtneyholcomb
Issue: "726"
61 changes: 37 additions & 24 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def _build_metrics_output_node(
)
aggregated_measures_node = self.build_aggregated_measures(
metric_input_measure_specs=metric_input_measure_specs,
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
where_constraint=combined_where,
time_range_constraint=time_range_constraint,
Expand All @@ -250,16 +251,7 @@ def _build_metrics_output_node(

assert compute_metrics_node is not None

if metric_spec.offset_window or metric_spec.offset_to_grain:
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
)
output_nodes.append(join_to_time_spine_node)
else:
output_nodes.append(compute_metrics_node)
output_nodes.append(compute_metrics_node)

assert len(output_nodes) > 0, "ComputeMetricsNode was not properly constructed"

Expand Down Expand Up @@ -625,6 +617,7 @@ def build_computed_metrics_node(
def build_aggregated_measures(
self,
metric_input_measure_specs: Sequence[MetricInputMeasureSpec],
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
Expand Down Expand Up @@ -685,6 +678,7 @@ def build_aggregated_measures(
output_nodes.append(
self._build_aggregated_measures_from_measure_source_node(
metric_input_measure_specs=input_specs,
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
where_constraint=node_where_constraint,
time_range_constraint=time_range_constraint,
Expand Down Expand Up @@ -712,6 +706,7 @@ def build_aggregated_measures(
def _build_aggregated_measures_from_measure_source_node(
self,
metric_input_measure_specs: Sequence[MetricInputMeasureSpec],
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
Expand Down Expand Up @@ -780,9 +775,38 @@ def _build_aggregated_measures_from_measure_source_node(
f"Recipe not found for measure specs: {measure_specs} and linkable specs: {required_linkable_specs}"
)

metric_time_dimension_spec: Optional[TimeDimensionSpec] = None
for linkable_spec in queried_linkable_specs.time_dimension_specs:
if linkable_spec.element_name == self._metric_time_dimension_reference.element_name:
metric_time_dimension_spec = linkable_spec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be the correct spec for metric_time - as far as I can tell, the queried_linkable_specs may include a variety of metric_time granularities. We probably want the smallest granularity one, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point. What if the query included two incompatible granularities like week and month? We might need to take a list of metric_time_specs in the time spine builder!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To wrap up this thread - I ended up implementing the logic to handle multiple metric time dimensions in the dataflow to SQL logic, not the time spine builder.

break

# If a cumulative metric is queried with metric time, join over time range.
# Otherwise, the measure will be aggregated over all time.
time_range_node: Optional[JoinOverTimeRangeNode[SqlDataSetT]] = None
if cumulative and metric_time_dimension_spec:
time_range_node = JoinOverTimeRangeNode(
parent_node=measure_recipe.measure_node,
window=cumulative_window,
grain_to_date=cumulative_grain_to_date,
time_range_constraint=time_range_constraint,
)

# If querying an offset metric, join to time spine.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if metric_spec.offset_window or metric_spec.offset_to_grain:
assert metric_time_dimension_spec, "Joining to time spine requires querying with metric time."
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=time_range_node or measure_recipe.measure_node,
metric_time_dimension_spec=metric_time_dimension_spec,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
)

# Only get the required measure and the local linkable instances so that aggregations work correctly.
filtered_measure_source_node = FilterElementsNode[SqlDataSetT](
parent_node=measure_recipe.measure_node,
parent_node=join_to_time_spine_node or time_range_node or measure_recipe.measure_node,
include_specs=InstanceSpecSet.merge(
(
InstanceSpecSet(measure_specs=measure_specs),
Expand All @@ -791,18 +815,7 @@ def _build_aggregated_measures_from_measure_source_node(
),
)

time_range_node: Optional[JoinOverTimeRangeNode[SqlDataSetT]] = None
if cumulative:
time_range_node = JoinOverTimeRangeNode(
parent_node=filtered_measure_source_node,
window=cumulative_window,
grain_to_date=cumulative_grain_to_date,
time_range_constraint=time_range_constraint,
)

filtered_measure_or_time_range_node = time_range_node or filtered_measure_source_node
join_targets = []

for join_recipe in measure_recipe.join_linkable_instances_recipes:
# Figure out what elements to filter from the joined node.

Expand Down Expand Up @@ -848,7 +861,7 @@ def _build_aggregated_measures_from_measure_source_node(
unaggregated_measure_node: BaseOutput[SqlDataSetT]
if len(join_targets) > 0:
filtered_measures_with_joined_elements = JoinToBaseOutputNode[SqlDataSetT](
left_node=filtered_measure_or_time_range_node,
left_node=filtered_measure_source_node,
join_targets=join_targets,
)

Expand All @@ -864,7 +877,7 @@ def _build_aggregated_measures_from_measure_source_node(
)
unaggregated_measure_node = after_join_filtered_node
else:
unaggregated_measure_node = filtered_measure_or_time_range_node
unaggregated_measure_node = filtered_measure_source_node

cumulative_metric_constrained_node: Optional[ConstrainTimeRangeNode] = None
if (
Expand Down
13 changes: 11 additions & 2 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ class JoinToTimeSpineNode(Generic[SourceDataSetT], BaseOutput[SourceDataSetT], A
def __init__(
self,
parent_node: BaseOutput[SourceDataSetT],
metric_time_dimension_spec: TimeDimensionSpec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this here?

Previously, the granularity was sourced from the parent node for making the time spine dataset, which contained all of the available granularities. Now it's coming in from the query parameters.

Now it's getting threaded through from the callsite, and what we're doing at the callsite is taking one of the potentially many query instances and wiring it through this node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parent dataset will no longer be narrowed down to only the requested metric_time_dimension_instances since the measure aggregation is being moved to after joining to time spine in this PR. Because of that, I thought it would be more efficient to pass through the requested granularity and only aggregate time spine columns based on that request. However, I can see a couple of potential issues with that.

  1. There might be multiple metric_time dimensions in the query. To handle this case, we could accept a list of metric_time_dimension_instances in _make_time_spine_data_set and create DATE_TRUNC columns for each.
  2. Maybe we don't want to prematurely optimize this query, and I should leave that to the optimizer. In that case we could create a DATE_TRUNC column for every possible granularity in _make_time_spine_data_set, and expect the unrequested ones to get filtered out by FilterElementsNode.

I'll definitely need to update the logic to handle case #1, but not sure about #2. Thoughts on that one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after some noodling on this I understood more what you meant. Ended up going back to inheriting the lowest granularity available from the parent dataset after all. So you can ignore this thread!

time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
Expand All @@ -726,6 +727,7 @@ def __init__(

Args:
parent_node: Node that returns desired dataset to join to time spine.
metric_time_dimension_spec: Metric time dimension requested in query. Used to determine granularity.
time_range_constraint: Time range to constrain the time spine to.
offset_window: Time window to offset the parent dataset by when joining to time spine.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
Expand All @@ -736,6 +738,7 @@ def __init__(
offset_window and offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
self._parent_node = parent_node
self._metric_time_dimension_spec = metric_time_dimension_spec
self._offset_window = offset_window
self._offset_to_grain = offset_to_grain
self._time_range_constraint = time_range_constraint
Expand All @@ -746,6 +749,11 @@ def __init__(
def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def metric_time_dimension_spec(self) -> TimeDimensionSpec: # noqa: D
"""Time dimension spec to use when creating time spine table."""
return self._metric_time_dimension_spec

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]: # noqa: D
"""Time range constraint to apply when querying time spine table."""
Expand Down Expand Up @@ -795,9 +803,10 @@ def with_new_parents( # noqa: D
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode[SourceDataSetT](
parent_node=new_parent_nodes[0],
metric_time_dimension_spec=self.metric_time_dimension_spec,
time_range_constraint=self.time_range_constraint,
offset_window=self._offset_window,
offset_to_grain=self._offset_to_grain,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
)


Expand Down
Loading