From b813ca1bd6dae12124b88c7f6f576606dd47957f Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:04:16 -0700 Subject: [PATCH 01/35] first draft --- src/mozanalysis/inflight.py | 369 ++++++++++++++++++++++++++++++++++++ 1 file changed, 369 insertions(+) create mode 100644 src/mozanalysis/inflight.py diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py new file mode 100644 index 00000000..3974ca62 --- /dev/null +++ b/src/mozanalysis/inflight.py @@ -0,0 +1,369 @@ +import attr + +from mozanalysis.experiment import TimeLimits +from mozanalysis.metrics import DataSource, Metric + +from textwrap import dedent + +import numpy as np +from scipy.special import lambertw + + +class ExperimentAnnotationMissingError(Exception): + pass + + +@attr.s(frozen=True, slots=True) +class InflightDataSource(DataSource): + """ + POC implementation of [this proposal](https://docs.google.com/document/d/1bNTGPDan_ANlKQy6p9Y3o5ZwLk_XyvSb9br3gBUywxA/edit) + Specifically, Theorem 4.1 from [Design-Based Confidence Sequences for Anytime-valid Causal Inference](https://arxiv.org/pdf/2210.08639.pdf) + """ # noqa + + timestamp_column = attr.ib(validator=attr.validators.instance_of(str)) + + @property + def experiments_column_expr(self) -> str: + """Returns a SQL expression to extract the branch from the + experiment annotations""" + if self.experiments_column_type is None: + raise ExperimentAnnotationMissingError + + elif self.experiments_column_type == "simple": + return """`mozfun.map.get_key`(ds.experiments, '{experiment_slug}')""" + + elif self.experiments_column_type == "native": + return ( + """`mozfun.map.get_key`(ds.experiments, '{experiment_slug}').branch""" + ) + + elif self.experiments_column_type == "glean": + return """`mozfun.map.get_key`(ds.ping_info.experiments, '{experiment_slug}').branch""" + + else: + raise ValueError + + def build_record_query( + self, + metric: Metric, + time_limits: TimeLimits, + experiment_slug: str, + from_expr_dataset: str | None = None, + ) -> str: + """ + Returns the SQL to create a client-timestamp level dataset. + + This does not assume an enrollments table has been created, instead + relies upon experiment annotations. + """ + + query = dedent( + f""" + SELECT + ds.client_id, + {self.experiments_column_expr()} AS branch, + MIN(ds.{self.timestamp_column}) AS event_timestamp, + MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} + FROM {self.from_expr_for(from_expr_dataset)} ds + WHERE 1=1 + AND ds.{self.timestamp_column} BETWEEN {time_limits.first_date_data_required} AND {time_limits.last_date_data_required} + AND {self.experiments_column_expr()} IS NOT NULL + GROUP BY client_id, branch + ORDER BY event_timestamp + """ # noqa + ) + + return query + + def build_statistics_query_piece_prep( + self, comparison_branch: str, reference_branch: str, metric_name: str + ) -> str: + """ + Prepares/formats the record-level data for the statistical computations. + + Filters to clients from the `reference_branch` or `comparison_branch`, + constructs treatment indicators, a `Y_i` column, and a rank column. + + Assumes an upstream CTE holding the output of `build_record_query` + named `records`. + """ + query = dedent( + f""" + SELECT + *, + CASE WHEN branch = "{comparison_branch}" THEN 1 ELSE 0 END AS treated, + CASE WHEN branch = "{reference_branch}" THEN 1 ELSE 0 END AS not_treated, + {metric_name} AS Y_i, + RANK() OVER (ORDER BY event_timestamp) AS n + FROM records + WHERE branch in ("{reference_branch}", "{comparison_branch}") + """ + ) + return query + + def build_statistics_query_piece_sufficient_statistics(self) -> str: + """ + Builds upon `build_statistics_query_piece_intro` to add the sufficient statistics + `tau_hat_i` and `sigma_hat_sq_i` necessary to calculate the confidence sequence. + + Adds: + - `tau_hat_i`: either +1/2*metric value (in case of comparison branch) or + -1/2*metric value (in case of reference branch). + - `sigma_hat_sq_i`: either +1/4*(metric value)^2 (in case of comparison branch) or + -1/4*(metric value)^2 (in case of reference branch). + + Assumes an upstream CTE holding the output of `build_statistics_query_piece_prep` + named `prep`. + """ + + query = dedent( + """ + SELECT + *, + treated*Y_i/0.5 - not_treated*Y_i/0.5 AS tau_hat_i, + treated*POW(Y_i,2)/POW(0.5,2) + not_treated*POW(Y_i,2)/POW(0.5,2) AS sigma_hat_sq_i, + FROM prep + """ + ) + + return query + + def build_statistics_query_piece_accumulators(self) -> str: + """ + Builds upon `build_statistics_query_piece_sufficient_statistics` to construct + expanding sufficient statistics (accumulate the sufficient statistics over time). + + Adds: + - point_est: the expanding average of `tau_hat_i`, over clients present up to and + including this time point. Under null hypothesis, distribution is centered at 0. + - var_est: the expanding variance estimator, over clients present up to an including + this time point. Known as S_n in the literature. + + Assumes an upstream CTE holding the output of + `build_statistics_query_piece_sufficient_statistics` named `sufficient_statistics`. + """ + + query = dedent( + """ + SELECT + *, + -- SUM(tau_hat_i) OVER (ORDER BY event_timestamp) AS tau_hat_i_acc, + 1/n * SUM(tau_hat_i) OVER (ORDER BY event_timestamp) AS point_est, + SUM(sigma_hat_sq_i) OVER (ORDER BY event_timestamp) AS var_est + FROM sufficient_statistics + """ + ) + return query + + def build_statistics_query_piece_ci_terms( + self, minimum_width_observations: int = 100, alpha: float = 0.05 + ) -> str: + """ + Builds upon `build_statistics_query_piece_accumulators` to construct + the two terms needed to calculate the width of the confidence sequence. + + Assumes an upstream CTE holding the output of + `build_statistics_query_piece_accumulators` named `accumulators`. + """ + + eta_sq = self.eta(minimum_width_observations, alpha) ** 2 + alpha_sq = alpha**2 + + query = dedent( + f""" + SELECT + *, + (var_est * {eta_sq} + 1)/{eta_sq} AS width_term_1, + LN((var_est * {eta_sq}+1)/{alpha_sq}) AS width_term_2 + FROM accumulators + """ + ) + return query + + def build_statistics_query_piece_ci_width(self) -> str: + """ + Builds upon `build_statistics_query_piece_accumulators` to construct + the two terms needed to calculate the width of the confidence sequence. + + Adds: + - ci_width: the width of the confidence sequence at this time. + + Assumes an upstream CTE holding the output of + `build_statistics_query_piece_ci_terms` named `ci_terms`. + """ + + query = dedent( + """ + SELECT + *, + (1/n) * SQRT(width_term_1 * width_term_2) AS ci_width + FROM ci_terms + """ + ) + + return query + + def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: + """ + Cleans up the output of `build_statistics_query_piece_ci_width`. + + Assumes an upstream CTE holding the output of + `build_statistics_query_piece_ci_width` named `ci_width` + """ + + query = dedent( + f""" + SELECT + event_timestamp, + n, + "{comparison_branch}" AS comparison_branch, + point_est, + point_est - ci_width AS ci_lower, + point_est + ci_width AS ci_upper + FROM ci_width + """ + ) + return query + + def build_statistics_query_one_branch( + self, + comparison_branch: str, + reference_branch: str, + metric_name: str, + minimum_width_observations: int = 1000, + alpha: float = 0.05, + ) -> str: + """ + Builds the statistical query to construct the confidence sequence to compare + a `comparison_branch` to a `reference_branch`. + """ + + query = dedent( + f""" + WITH prep AS ( + {self.build_statistics_query_piece_prep(comparison_branch, reference_branch, metric_name)} + ), sufficient_statistics AS ( + {self.build_statistics_query_piece_sufficient_statistics()} + ), accumulators AS ( + {self.build_statistics_query_piece_accumulators()} + ), ci_terms AS ( + {self.build_statistics_query_piece_ci_terms(minimum_width_observations, alpha)} + ), ci_width AS ( + {self.build_statistics_query_piece_ci_width()} + ), ci_cleanup AS ( + {self.build_statistics_query_piece_cleanup(comparison_branch)} + ) + SELECT * + FROM ci_cleanup + """ + ) + + return query + + def build_union_query( + self, comparison_branches: list[str], full_sample: bool = False + ) -> str: + clean_comparison_branches = [ + self.sanitize_branch_name(branch) for branch in comparison_branches + ] + branch_timestamps = [ + f"{branch}.event_timestamp" for branch in clean_comparison_branches + ].join(",") + query = dedent( + f""" + SELECT + n, + MIN({branch_timestamps}) AS record_timestamp, + """ + ) + for branch in clean_comparison_branches: + query += dedent( + f""" + + {branch}.point_est AS point_est_{branch}, + {branch}.ci_lower AS ci_lower_{branch}, + {branch}.ci_upper AS ci_upper{branch}, + """ + ) + + query += dedent( + f""" + FROM {clean_comparison_branches[0]} + """ + ) + + if len(clean_comparison_branches) > 1: + for next_branch in clean_comparison_branches[1:]: + query += dedent( + f""" + FULL OUTER JOIN {next_branch} + USING(n) + """ + ) + return query + + def build_statistics_query( + self, + comparison_branches: list[str], + reference_branch: str, + metric: Metric, + time_limits: TimeLimits, + experiment_slug: str, + from_expr_dataset: str | None = None, + minimum_width_observations: int = 1000, + alpha: float = 0.05, + full_sample: bool = False, + ) -> str: + query = dedent( + f""" + WITH records AS ( + {self.build_record_query(metric, time_limits, experiment_slug, from_expr_dataset)} + )""" + ) + + for comparison_branch in comparison_branches: + comparison_branch_name = self.sanitize_branch_name(comparison_branch) + subquery = self.build_statistics_query_one_branch( + comparison_branch, + reference_branch, + metric.name, + minimum_width_observations, + alpha, + ) + query += dedent( + f""", {comparison_branch_name} AS ( + {subquery} + )""" + ) + query += "\n" + query += self.build_union_query(comparison_branches, full_sample) + + return query + + @staticmethod + def eta(minimum_width_observations: int = 100, alpha: float = 0.05) -> float: + """ + Returns the `eta` (tuning parameter) that minimizes the relative width of the + confidence sequence after `minimum_width_observations` clients enrolled. Note + that each comparison is done with one branch relative to control, so account + for the number of branches when calculating this term. E.g., a 5-branch experiment + with 500,000 total enrollees has `eta` defined for `minimum_width_observations` + in (1,200_000]. + + We default to 100 to focus the "alpha spending" near the start of the experiment. + """ + alpha_sq = alpha**2 + eta = np.sqrt( + (-1 * lambertw(-1 * alpha_sq * np.exp(1), -1) - 1) + / minimum_width_observations + ).real + return eta + + @staticmethod + def sanitize_branch_name(branch: str) -> str: + return branch.replace("-", "_") + + +@attr.s(frozen=True, slots=True) +class InflightMetric(Metric): + data_source = attr.ib(type=InflightDataSource) From 9f5ba396ecc78fc8ab86f080f79a24eda0befa9a Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:12:47 -0700 Subject: [PATCH 02/35] added main ping annotation type --- src/mozanalysis/inflight.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 3974ca62..68e3b17e 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -40,6 +40,9 @@ def experiments_column_expr(self) -> str: elif self.experiments_column_type == "glean": return """`mozfun.map.get_key`(ds.ping_info.experiments, '{experiment_slug}').branch""" + elif self.experiments_column_type == "main_live": + return """`mozfun.map.get_key`(ds.environment.experiments, '{experiment_slug}').branch""" + else: raise ValueError From 1bd13c07523c14e63ad592761b739fb253d5faab Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:21:14 -0700 Subject: [PATCH 03/35] added render function --- src/mozanalysis/inflight.py | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 68e3b17e..d3d4d535 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -20,7 +20,9 @@ class InflightDataSource(DataSource): Specifically, Theorem 4.1 from [Design-Based Confidence Sequences for Anytime-valid Causal Inference](https://arxiv.org/pdf/2210.08639.pdf) """ # noqa - timestamp_column = attr.ib(validator=attr.validators.instance_of(str)) + timestamp_column = attr.ib( + default="submission_timestamp", validator=attr.validators.instance_of(str) + ) @property def experiments_column_expr(self) -> str: @@ -49,7 +51,8 @@ def experiments_column_expr(self) -> str: def build_record_query( self, metric: Metric, - time_limits: TimeLimits, + start_date: str, + end_date: str, experiment_slug: str, from_expr_dataset: str | None = None, ) -> str: @@ -69,7 +72,7 @@ def build_record_query( MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} FROM {self.from_expr_for(from_expr_dataset)} ds WHERE 1=1 - AND ds.{self.timestamp_column} BETWEEN {time_limits.first_date_data_required} AND {time_limits.last_date_data_required} + AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" AND {self.experiments_column_expr()} IS NOT NULL GROUP BY client_id, branch ORDER BY event_timestamp @@ -310,7 +313,8 @@ def build_statistics_query( comparison_branches: list[str], reference_branch: str, metric: Metric, - time_limits: TimeLimits, + start_date: str, + end_date: str, experiment_slug: str, from_expr_dataset: str | None = None, minimum_width_observations: int = 1000, @@ -320,7 +324,7 @@ def build_statistics_query( query = dedent( f""" WITH records AS ( - {self.build_record_query(metric, time_limits, experiment_slug, from_expr_dataset)} + {self.build_record_query(metric, start_date, end_date, experiment_slug, from_expr_dataset)} )""" ) @@ -370,3 +374,26 @@ def sanitize_branch_name(branch: str) -> str: @attr.s(frozen=True, slots=True) class InflightMetric(Metric): data_source = attr.ib(type=InflightDataSource) + + def render_inflight_query( + self, + start_date: str, + end_date: str, + comparison_branches: list[str], + reference_branch: str, + experiment_slug: str, + from_expr_dataset: str | None = None, + minimum_width_observations: int = 1000, + full_sample: bool = False, + ) -> str: + return self.data_source.build_statistics_query( + comparison_branches, + reference_branch, + self, + start_date, + end_date, + experiment_slug, + from_expr_dataset, + minimum_width_observations, + full_sample, + ) From 7436062b38cefbf97738f3336e94a406dd7c9543 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:24:15 -0700 Subject: [PATCH 04/35] make inflight metric its own class --- src/mozanalysis/inflight.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index d3d4d535..232a1489 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -372,8 +372,13 @@ def sanitize_branch_name(branch: str) -> str: @attr.s(frozen=True, slots=True) -class InflightMetric(Metric): +class InflightMetric: + name = attr.ib(type=str) + select_expr = attr.ib(type=str) data_source = attr.ib(type=InflightDataSource) + friendly_name = attr.ib(type=str | None, default=None) + description = attr.ib(type=str | None, default=None) + app_name = attr.ib(type=str | None, default=None) def render_inflight_query( self, From 6173de0d4959782a0fbdbd103a259468b18d0b1d Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:26:39 -0700 Subject: [PATCH 05/35] override experiment annotation types --- src/mozanalysis/inflight.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 232a1489..30d8abab 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -24,6 +24,8 @@ class InflightDataSource(DataSource): default="submission_timestamp", validator=attr.validators.instance_of(str) ) + EXPERIMENT_COLUMN_TYPES = (None, "simple", "native", "glean", "main_live") + @property def experiments_column_expr(self) -> str: """Returns a SQL expression to extract the branch from the From bfdce468370e72cff5513dc18895362a8040faf3 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:33:20 -0700 Subject: [PATCH 06/35] properties dont need parentheses --- src/mozanalysis/inflight.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 30d8abab..1e771430 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -69,13 +69,13 @@ def build_record_query( f""" SELECT ds.client_id, - {self.experiments_column_expr()} AS branch, + {self.experiments_column_expr} AS branch, MIN(ds.{self.timestamp_column}) AS event_timestamp, MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} FROM {self.from_expr_for(from_expr_dataset)} ds WHERE 1=1 AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" - AND {self.experiments_column_expr()} IS NOT NULL + AND {self.experiments_column_expr} IS NOT NULL GROUP BY client_id, branch ORDER BY event_timestamp """ # noqa From f99ed4cd4ea925f3520bd9752da609f13b645291 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:35:44 -0700 Subject: [PATCH 07/35] fix join call --- src/mozanalysis/inflight.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 1e771430..75cf3c5c 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -274,9 +274,9 @@ def build_union_query( clean_comparison_branches = [ self.sanitize_branch_name(branch) for branch in comparison_branches ] - branch_timestamps = [ - f"{branch}.event_timestamp" for branch in clean_comparison_branches - ].join(",") + branch_timestamps = ",".join( + [f"{branch}.event_timestamp" for branch in clean_comparison_branches] + ) query = dedent( f""" SELECT From ab7a5ce6061510f07683eb29a53849ac82271578 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:54:23 -0700 Subject: [PATCH 08/35] try fix eta --- src/mozanalysis/inflight.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 75cf3c5c..cf9f69c0 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -52,7 +52,7 @@ def experiments_column_expr(self) -> str: def build_record_query( self, - metric: Metric, + metric: "InflightMetric", start_date: str, end_date: str, experiment_slug: str, @@ -314,7 +314,7 @@ def build_statistics_query( self, comparison_branches: list[str], reference_branch: str, - metric: Metric, + metric: "InflightMetric", start_date: str, end_date: str, experiment_slug: str, @@ -366,6 +366,7 @@ def eta(minimum_width_observations: int = 100, alpha: float = 0.05) -> float: (-1 * lambertw(-1 * alpha_sq * np.exp(1), -1) - 1) / minimum_width_observations ).real + assert np.isfinite(eta) return eta @staticmethod From 6fac882860477eb1c29b2ac8ab32a255d30b2a8e Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:56:23 -0700 Subject: [PATCH 09/35] try fix eta --- src/mozanalysis/inflight.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index cf9f69c0..5d48b5a2 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -238,7 +238,7 @@ def build_statistics_query_one_branch( comparison_branch: str, reference_branch: str, metric_name: str, - minimum_width_observations: int = 1000, + minimum_width_observations: int = 100, alpha: float = 0.05, ) -> str: """ @@ -391,7 +391,8 @@ def render_inflight_query( reference_branch: str, experiment_slug: str, from_expr_dataset: str | None = None, - minimum_width_observations: int = 1000, + minimum_width_observations: int = 100, + alpha: float = 0.05, full_sample: bool = False, ) -> str: return self.data_source.build_statistics_query( @@ -401,7 +402,8 @@ def render_inflight_query( start_date, end_date, experiment_slug, - from_expr_dataset, - minimum_width_observations, - full_sample, + from_expr_dataset=from_expr_dataset, + minimum_width_observations=minimum_width_observations, + alpha=alpha, + full_sample=full_sample, ) From 9c1bdcff50370b4ec782f33fad464b682f468d96 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:58:03 -0700 Subject: [PATCH 10/35] rename CTE --- src/mozanalysis/inflight.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 5d48b5a2..a5ed41e9 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -216,7 +216,7 @@ def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: Cleans up the output of `build_statistics_query_piece_ci_width`. Assumes an upstream CTE holding the output of - `build_statistics_query_piece_ci_width` named `ci_width` + `build_statistics_query_piece_ci_width` named `ci_width_term` """ query = dedent( @@ -256,7 +256,7 @@ def build_statistics_query_one_branch( {self.build_statistics_query_piece_accumulators()} ), ci_terms AS ( {self.build_statistics_query_piece_ci_terms(minimum_width_observations, alpha)} - ), ci_width AS ( + ), ci_width_term AS ( {self.build_statistics_query_piece_ci_width()} ), ci_cleanup AS ( {self.build_statistics_query_piece_cleanup(comparison_branch)} From 0995277459fcbb2f6f9f90eb318112ebd684caea Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 11:59:43 -0700 Subject: [PATCH 11/35] rename CTE --- src/mozanalysis/inflight.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index a5ed41e9..558c43d7 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -228,7 +228,7 @@ def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: point_est, point_est - ci_width AS ci_lower, point_est + ci_width AS ci_upper - FROM ci_width + FROM ci_width_term """ ) return query From 2c12ffa8d7a8be5cd79f7b22bb82610441c40431 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 12:02:03 -0700 Subject: [PATCH 12/35] make min args list --- src/mozanalysis/inflight.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 558c43d7..efd936b3 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -281,7 +281,7 @@ def build_union_query( f""" SELECT n, - MIN({branch_timestamps}) AS record_timestamp, + MIN([{branch_timestamps}]) AS record_timestamp, """ ) for branch in clean_comparison_branches: From 3ad1b7f77da9af79bf8ade9979c35ed4aa63d108 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 12:04:26 -0700 Subject: [PATCH 13/35] min -> least --- src/mozanalysis/inflight.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index efd936b3..d185291d 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -274,14 +274,14 @@ def build_union_query( clean_comparison_branches = [ self.sanitize_branch_name(branch) for branch in comparison_branches ] - branch_timestamps = ",".join( + branch_timestamps = ", ".join( [f"{branch}.event_timestamp" for branch in clean_comparison_branches] ) query = dedent( f""" SELECT n, - MIN([{branch_timestamps}]) AS record_timestamp, + LEAST({branch_timestamps}) AS record_timestamp, """ ) for branch in clean_comparison_branches: From 2ec0edbb1eb7d5f07f8ac74176c6dfc86163c007 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 12:08:38 -0700 Subject: [PATCH 14/35] drop dedents --- src/mozanalysis/inflight.py | 59 +++++++++++++------------------------ 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index d185291d..bac18d71 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -65,8 +65,7 @@ def build_record_query( relies upon experiment annotations. """ - query = dedent( - f""" + query = f""" SELECT ds.client_id, {self.experiments_column_expr} AS branch, @@ -79,7 +78,6 @@ def build_record_query( GROUP BY client_id, branch ORDER BY event_timestamp """ # noqa - ) return query @@ -95,8 +93,7 @@ def build_statistics_query_piece_prep( Assumes an upstream CTE holding the output of `build_record_query` named `records`. """ - query = dedent( - f""" + query = f""" SELECT *, CASE WHEN branch = "{comparison_branch}" THEN 1 ELSE 0 END AS treated, @@ -106,7 +103,7 @@ def build_statistics_query_piece_prep( FROM records WHERE branch in ("{reference_branch}", "{comparison_branch}") """ - ) + return query def build_statistics_query_piece_sufficient_statistics(self) -> str: @@ -124,15 +121,13 @@ def build_statistics_query_piece_sufficient_statistics(self) -> str: named `prep`. """ - query = dedent( - """ + query = """ SELECT *, treated*Y_i/0.5 - not_treated*Y_i/0.5 AS tau_hat_i, treated*POW(Y_i,2)/POW(0.5,2) + not_treated*POW(Y_i,2)/POW(0.5,2) AS sigma_hat_sq_i, FROM prep """ - ) return query @@ -151,8 +146,7 @@ def build_statistics_query_piece_accumulators(self) -> str: `build_statistics_query_piece_sufficient_statistics` named `sufficient_statistics`. """ - query = dedent( - """ + query = """ SELECT *, -- SUM(tau_hat_i) OVER (ORDER BY event_timestamp) AS tau_hat_i_acc, @@ -160,7 +154,7 @@ def build_statistics_query_piece_accumulators(self) -> str: SUM(sigma_hat_sq_i) OVER (ORDER BY event_timestamp) AS var_est FROM sufficient_statistics """ - ) + return query def build_statistics_query_piece_ci_terms( @@ -177,15 +171,14 @@ def build_statistics_query_piece_ci_terms( eta_sq = self.eta(minimum_width_observations, alpha) ** 2 alpha_sq = alpha**2 - query = dedent( - f""" + query = f""" SELECT *, (var_est * {eta_sq} + 1)/{eta_sq} AS width_term_1, LN((var_est * {eta_sq}+1)/{alpha_sq}) AS width_term_2 FROM accumulators """ - ) + return query def build_statistics_query_piece_ci_width(self) -> str: @@ -200,14 +193,12 @@ def build_statistics_query_piece_ci_width(self) -> str: `build_statistics_query_piece_ci_terms` named `ci_terms`. """ - query = dedent( - """ + query = """ SELECT *, (1/n) * SQRT(width_term_1 * width_term_2) AS ci_width FROM ci_terms """ - ) return query @@ -219,8 +210,7 @@ def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: `build_statistics_query_piece_ci_width` named `ci_width_term` """ - query = dedent( - f""" + query = f""" SELECT event_timestamp, n, @@ -230,7 +220,7 @@ def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: point_est + ci_width AS ci_upper FROM ci_width_term """ - ) + return query def build_statistics_query_one_branch( @@ -246,8 +236,7 @@ def build_statistics_query_one_branch( a `comparison_branch` to a `reference_branch`. """ - query = dedent( - f""" + query = f""" WITH prep AS ( {self.build_statistics_query_piece_prep(comparison_branch, reference_branch, metric_name)} ), sufficient_statistics AS ( @@ -264,7 +253,6 @@ def build_statistics_query_one_branch( SELECT * FROM ci_cleanup """ - ) return query @@ -277,37 +265,31 @@ def build_union_query( branch_timestamps = ", ".join( [f"{branch}.event_timestamp" for branch in clean_comparison_branches] ) - query = dedent( - f""" + query = f""" SELECT n, LEAST({branch_timestamps}) AS record_timestamp, """ - ) + for branch in clean_comparison_branches: - query += dedent( - f""" + query += f""" {branch}.point_est AS point_est_{branch}, {branch}.ci_lower AS ci_lower_{branch}, {branch}.ci_upper AS ci_upper{branch}, """ - ) - query += dedent( - f""" + query += f""" FROM {clean_comparison_branches[0]} """ - ) if len(clean_comparison_branches) > 1: for next_branch in clean_comparison_branches[1:]: - query += dedent( - f""" + query += f""" FULL OUTER JOIN {next_branch} USING(n) """ - ) + return query def build_statistics_query( @@ -339,11 +321,10 @@ def build_statistics_query( minimum_width_observations, alpha, ) - query += dedent( - f""", {comparison_branch_name} AS ( + query += f""", {comparison_branch_name} AS ( {subquery} )""" - ) + query += "\n" query += self.build_union_query(comparison_branches, full_sample) From 58c1af3e4e07d1fa6b1ac87152da69f51c1d4111 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 12:10:46 -0700 Subject: [PATCH 15/35] clean sql --- src/mozanalysis/inflight.py | 39 +++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index bac18d71..e3f2c30e 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -65,8 +65,7 @@ def build_record_query( relies upon experiment annotations. """ - query = f""" - SELECT + query = f"""SELECT ds.client_id, {self.experiments_column_expr} AS branch, MIN(ds.{self.timestamp_column}) AS event_timestamp, @@ -76,8 +75,7 @@ def build_record_query( AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" AND {self.experiments_column_expr} IS NOT NULL GROUP BY client_id, branch - ORDER BY event_timestamp - """ # noqa + ORDER BY event_timestamp""" # noqa return query @@ -237,21 +235,21 @@ def build_statistics_query_one_branch( """ query = f""" - WITH prep AS ( - {self.build_statistics_query_piece_prep(comparison_branch, reference_branch, metric_name)} - ), sufficient_statistics AS ( - {self.build_statistics_query_piece_sufficient_statistics()} - ), accumulators AS ( - {self.build_statistics_query_piece_accumulators()} - ), ci_terms AS ( - {self.build_statistics_query_piece_ci_terms(minimum_width_observations, alpha)} - ), ci_width_term AS ( - {self.build_statistics_query_piece_ci_width()} - ), ci_cleanup AS ( - {self.build_statistics_query_piece_cleanup(comparison_branch)} - ) - SELECT * - FROM ci_cleanup + WITH prep AS ( + {self.build_statistics_query_piece_prep(comparison_branch, reference_branch, metric_name)} + ), sufficient_statistics AS ( + {self.build_statistics_query_piece_sufficient_statistics()} + ), accumulators AS ( + {self.build_statistics_query_piece_accumulators()} + ), ci_terms AS ( + {self.build_statistics_query_piece_ci_terms(minimum_width_observations, alpha)} + ), ci_width_term AS ( + {self.build_statistics_query_piece_ci_width()} + ), ci_cleanup AS ( + {self.build_statistics_query_piece_cleanup(comparison_branch)} + ) + SELECT * + FROM ci_cleanup """ return query @@ -306,8 +304,7 @@ def build_statistics_query( full_sample: bool = False, ) -> str: query = dedent( - f""" - WITH records AS ( + f"""WITH records AS ( {self.build_record_query(metric, start_date, end_date, experiment_slug, from_expr_dataset)} )""" ) From 8ee77925f860a78c4b11e5b1a37d25f886c1c872 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 12:13:33 -0700 Subject: [PATCH 16/35] clean sql --- src/mozanalysis/inflight.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index e3f2c30e..a19d9ecd 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -65,17 +65,18 @@ def build_record_query( relies upon experiment annotations. """ - query = f"""SELECT - ds.client_id, - {self.experiments_column_expr} AS branch, - MIN(ds.{self.timestamp_column}) AS event_timestamp, - MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} - FROM {self.from_expr_for(from_expr_dataset)} ds - WHERE 1=1 - AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" - AND {self.experiments_column_expr} IS NOT NULL - GROUP BY client_id, branch - ORDER BY event_timestamp""" # noqa + query = f""" + SELECT + ds.client_id, + {self.experiments_column_expr} AS branch, + MIN(ds.{self.timestamp_column}) AS event_timestamp, + MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} + FROM {self.from_expr_for(from_expr_dataset)} ds + WHERE 1=1 + AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" + AND {self.experiments_column_expr} IS NOT NULL + GROUP BY client_id, branch + ORDER BY event_timestamp""" # noqa return query @@ -303,9 +304,11 @@ def build_statistics_query( alpha: float = 0.05, full_sample: bool = False, ) -> str: + record_query = self.build_record_query( + metric, start_date, end_date, experiment_slug, from_expr_dataset + ) query = dedent( - f"""WITH records AS ( - {self.build_record_query(metric, start_date, end_date, experiment_slug, from_expr_dataset)} + f"""WITH records AS ({record_query} )""" ) From 8b4293b738b9a7b5d9a27b524ba4d83f1a60fd3b Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 12:15:29 -0700 Subject: [PATCH 17/35] clean sql --- src/mozanalysis/inflight.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index a19d9ecd..55d5101c 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -27,25 +27,25 @@ class InflightDataSource(DataSource): EXPERIMENT_COLUMN_TYPES = (None, "simple", "native", "glean", "main_live") @property - def experiments_column_expr(self) -> str: + def experiments_column_expr(self, experiment_slug: str) -> str: """Returns a SQL expression to extract the branch from the experiment annotations""" if self.experiments_column_type is None: raise ExperimentAnnotationMissingError elif self.experiments_column_type == "simple": - return """`mozfun.map.get_key`(ds.experiments, '{experiment_slug}')""" + return f"""`mozfun.map.get_key`(ds.experiments, '{experiment_slug}')""" elif self.experiments_column_type == "native": return ( - """`mozfun.map.get_key`(ds.experiments, '{experiment_slug}').branch""" + f"""`mozfun.map.get_key`(ds.experiments, '{experiment_slug}').branch""" ) elif self.experiments_column_type == "glean": - return """`mozfun.map.get_key`(ds.ping_info.experiments, '{experiment_slug}').branch""" + return f"""`mozfun.map.get_key`(ds.ping_info.experiments, '{experiment_slug}').branch""" elif self.experiments_column_type == "main_live": - return """`mozfun.map.get_key`(ds.environment.experiments, '{experiment_slug}').branch""" + return f"""`mozfun.map.get_key`(ds.environment.experiments, '{experiment_slug}').branch""" else: raise ValueError @@ -74,7 +74,7 @@ def build_record_query( FROM {self.from_expr_for(from_expr_dataset)} ds WHERE 1=1 AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" - AND {self.experiments_column_expr} IS NOT NULL + AND {self.experiments_column_expr(experiment_slug)} IS NOT NULL GROUP BY client_id, branch ORDER BY event_timestamp""" # noqa From 72b12a42e92e946386b483caaf99582bc0744b9d Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:10:53 -0700 Subject: [PATCH 18/35] fixed experiments column expr property --- src/mozanalysis/inflight.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 55d5101c..7059459a 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -24,28 +24,28 @@ class InflightDataSource(DataSource): default="submission_timestamp", validator=attr.validators.instance_of(str) ) - EXPERIMENT_COLUMN_TYPES = (None, "simple", "native", "glean", "main_live") + EXPERIMENT_COLUMN_TYPES = (None, "simple", "native", "glean", "main_live") # noqa @property - def experiments_column_expr(self, experiment_slug: str) -> str: + def experiments_column_expr(self) -> str: """Returns a SQL expression to extract the branch from the experiment annotations""" if self.experiments_column_type is None: raise ExperimentAnnotationMissingError elif self.experiments_column_type == "simple": - return f"""`mozfun.map.get_key`(ds.experiments, '{experiment_slug}')""" + return """`mozfun.map.get_key`(ds.experiments, '{experiment_slug}')""" elif self.experiments_column_type == "native": return ( - f"""`mozfun.map.get_key`(ds.experiments, '{experiment_slug}').branch""" + """`mozfun.map.get_key`(ds.experiments, '{experiment_slug}').branch""" ) elif self.experiments_column_type == "glean": - return f"""`mozfun.map.get_key`(ds.ping_info.experiments, '{experiment_slug}').branch""" + return """`mozfun.map.get_key`(ds.ping_info.experiments, '{experiment_slug}').branch""" elif self.experiments_column_type == "main_live": - return f"""`mozfun.map.get_key`(ds.environment.experiments, '{experiment_slug}').branch""" + return """`mozfun.map.get_key`(ds.environment.experiments, '{experiment_slug}').branch""" else: raise ValueError @@ -68,13 +68,13 @@ def build_record_query( query = f""" SELECT ds.client_id, - {self.experiments_column_expr} AS branch, + {self.experiments_column_expr.format(experiment_slug=experiment_slug)} AS branch, MIN(ds.{self.timestamp_column}) AS event_timestamp, MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} FROM {self.from_expr_for(from_expr_dataset)} ds WHERE 1=1 AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" - AND {self.experiments_column_expr(experiment_slug)} IS NOT NULL + AND {self.experiments_column_expr.format(experiment_slug=experiment_slug)} IS NOT NULL GROUP BY client_id, branch ORDER BY event_timestamp""" # noqa From e1d1b14967f0b85c1d00120c7f8cc2cac6621410 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:16:00 -0700 Subject: [PATCH 19/35] clean sql --- src/mozanalysis/inflight.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 7059459a..02759d43 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -309,7 +309,7 @@ def build_statistics_query( ) query = dedent( f"""WITH records AS ({record_query} - )""" +)""" ) for comparison_branch in comparison_branches: From 2f48b7236394ac39d0c06f385d3dd3aed0c8ced7 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:18:51 -0700 Subject: [PATCH 20/35] clean sql --- src/mozanalysis/inflight.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 02759d43..75304568 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -321,9 +321,7 @@ def build_statistics_query( minimum_width_observations, alpha, ) - query += f""", {comparison_branch_name} AS ( - {subquery} - )""" + query += f""", {comparison_branch_name} AS ({subquery})""" query += "\n" query += self.build_union_query(comparison_branches, full_sample) From 37a15e4dec59844572b0ba4d1c6f373c41053dc8 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:23:10 -0700 Subject: [PATCH 21/35] clean sql --- src/mozanalysis/inflight.py | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 75304568..16cf983e 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -92,16 +92,14 @@ def build_statistics_query_piece_prep( Assumes an upstream CTE holding the output of `build_record_query` named `records`. """ - query = f""" - SELECT + query = f"""SELECT *, CASE WHEN branch = "{comparison_branch}" THEN 1 ELSE 0 END AS treated, CASE WHEN branch = "{reference_branch}" THEN 1 ELSE 0 END AS not_treated, {metric_name} AS Y_i, RANK() OVER (ORDER BY event_timestamp) AS n FROM records - WHERE branch in ("{reference_branch}", "{comparison_branch}") - """ + WHERE branch in ("{reference_branch}", "{comparison_branch}")""" return query @@ -120,13 +118,11 @@ def build_statistics_query_piece_sufficient_statistics(self) -> str: named `prep`. """ - query = """ - SELECT + query = """SELECT *, treated*Y_i/0.5 - not_treated*Y_i/0.5 AS tau_hat_i, treated*POW(Y_i,2)/POW(0.5,2) + not_treated*POW(Y_i,2)/POW(0.5,2) AS sigma_hat_sq_i, - FROM prep - """ + FROM prep""" return query @@ -145,14 +141,12 @@ def build_statistics_query_piece_accumulators(self) -> str: `build_statistics_query_piece_sufficient_statistics` named `sufficient_statistics`. """ - query = """ - SELECT + query = """SELECT *, -- SUM(tau_hat_i) OVER (ORDER BY event_timestamp) AS tau_hat_i_acc, 1/n * SUM(tau_hat_i) OVER (ORDER BY event_timestamp) AS point_est, SUM(sigma_hat_sq_i) OVER (ORDER BY event_timestamp) AS var_est - FROM sufficient_statistics - """ + FROM sufficient_statistics""" return query @@ -170,13 +164,11 @@ def build_statistics_query_piece_ci_terms( eta_sq = self.eta(minimum_width_observations, alpha) ** 2 alpha_sq = alpha**2 - query = f""" - SELECT + query = f"""SELECT *, (var_est * {eta_sq} + 1)/{eta_sq} AS width_term_1, LN((var_est * {eta_sq}+1)/{alpha_sq}) AS width_term_2 - FROM accumulators - """ + FROM accumulators""" return query @@ -192,12 +184,10 @@ def build_statistics_query_piece_ci_width(self) -> str: `build_statistics_query_piece_ci_terms` named `ci_terms`. """ - query = """ - SELECT + query = """SELECT *, (1/n) * SQRT(width_term_1 * width_term_2) AS ci_width - FROM ci_terms - """ + FROM ci_terms""" return query @@ -209,16 +199,14 @@ def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: `build_statistics_query_piece_ci_width` named `ci_width_term` """ - query = f""" - SELECT + query = f"""SELECT event_timestamp, n, "{comparison_branch}" AS comparison_branch, point_est, point_est - ci_width AS ci_lower, point_est + ci_width AS ci_upper - FROM ci_width_term - """ + FROM ci_width_term""" return query From bd018ded9f3f53bf9ee8e22d3787f689e6309b75 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:25:31 -0700 Subject: [PATCH 22/35] clean sql --- src/mozanalysis/inflight.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 16cf983e..b2d32bc4 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -263,7 +263,7 @@ def build_union_query( {branch}.point_est AS point_est_{branch}, {branch}.ci_lower AS ci_lower_{branch}, - {branch}.ci_upper AS ci_upper{branch}, + {branch}.ci_upper AS ci_upper_{branch}, """ query += f""" @@ -277,6 +277,10 @@ def build_union_query( USING(n) """ + query += f""" + ORDER BY record_timestamp + """ + return query def build_statistics_query( From fe04fbd0c7cb0f44c476c5026b1f8681b1aa6caf Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:27:46 -0700 Subject: [PATCH 23/35] clean sql --- src/mozanalysis/inflight.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index b2d32bc4..a0eef7d9 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -238,8 +238,7 @@ def build_statistics_query_one_branch( {self.build_statistics_query_piece_cleanup(comparison_branch)} ) SELECT * - FROM ci_cleanup - """ + FROM ci_cleanup""" return query From 80e006ec25c20fd9c70d29ae38b8b784fcab1c58 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:29:04 -0700 Subject: [PATCH 24/35] clean sql --- src/mozanalysis/inflight.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index a0eef7d9..02187e5c 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -238,7 +238,8 @@ def build_statistics_query_one_branch( {self.build_statistics_query_piece_cleanup(comparison_branch)} ) SELECT * - FROM ci_cleanup""" + FROM ci_cleanup +""" return query From 50171ca2d2f1cd25b5de5ef1329f5d6ba5d31ee1 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:31:01 -0700 Subject: [PATCH 25/35] clean sql --- src/mozanalysis/inflight.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 02187e5c..23bca9cb 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -253,22 +253,18 @@ def build_union_query( [f"{branch}.event_timestamp" for branch in clean_comparison_branches] ) query = f""" - SELECT - n, - LEAST({branch_timestamps}) AS record_timestamp, - """ +SELECT + n, + LEAST({branch_timestamps}) AS record_timestamp,""" for branch in clean_comparison_branches: query += f""" - - {branch}.point_est AS point_est_{branch}, - {branch}.ci_lower AS ci_lower_{branch}, - {branch}.ci_upper AS ci_upper_{branch}, - """ + {branch}.point_est AS point_est_{branch}, + {branch}.ci_lower AS ci_lower_{branch}, + {branch}.ci_upper AS ci_upper_{branch},""" query += f""" - FROM {clean_comparison_branches[0]} - """ +FROM {clean_comparison_branches[0]}""" if len(clean_comparison_branches) > 1: for next_branch in clean_comparison_branches[1:]: From 83b2d3e495a3f2fd45340f04333caf6223da98d8 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 14:32:16 -0700 Subject: [PATCH 26/35] clean sql --- src/mozanalysis/inflight.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 23bca9cb..55f777fa 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -269,13 +269,11 @@ def build_union_query( if len(clean_comparison_branches) > 1: for next_branch in clean_comparison_branches[1:]: query += f""" - FULL OUTER JOIN {next_branch} - USING(n) - """ +FULL OUTER JOIN {next_branch} +USING(n)""" query += f""" - ORDER BY record_timestamp - """ +ORDER BY record_timestamp""" return query From 2935ecf4b246e8580d7084f38c43a5799cbec6ff Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Mon, 22 Jul 2024 15:39:33 -0700 Subject: [PATCH 27/35] switch to expecting a metric-config-parser Experiment object --- src/mozanalysis/inflight.py | 47 +++++++++++++++---------------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 55f777fa..0c2037ce 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -1,12 +1,14 @@ import attr -from mozanalysis.experiment import TimeLimits -from mozanalysis.metrics import DataSource, Metric +from mozanalysis.metrics import DataSource + +from metric_config_parser.experiment import Experiment from textwrap import dedent import numpy as np from scipy.special import lambertw +from datetime import datetime class ExperimentAnnotationMissingError(Exception): @@ -53,9 +55,7 @@ def experiments_column_expr(self) -> str: def build_record_query( self, metric: "InflightMetric", - start_date: str, - end_date: str, - experiment_slug: str, + experiment: Experiment, from_expr_dataset: str | None = None, ) -> str: """ @@ -68,13 +68,13 @@ def build_record_query( query = f""" SELECT ds.client_id, - {self.experiments_column_expr.format(experiment_slug=experiment_slug)} AS branch, + {self.experiments_column_expr.format(experiment_slug=experiment.normandy_slug)} AS branch, MIN(ds.{self.timestamp_column}) AS event_timestamp, - MIN_BY({metric.select_expr.format(experiment_slug=experiment_slug)}, ds.{self.timestamp_column}) AS {metric.name} + MIN_BY({metric.select_expr.format(experiment_slug=experiment.normandy_slug)}, ds.{self.timestamp_column}) AS {metric.name} FROM {self.from_expr_for(from_expr_dataset)} ds WHERE 1=1 - AND ds.{self.timestamp_column} BETWEEN "{start_date}" AND "{end_date}" - AND {self.experiments_column_expr.format(experiment_slug=experiment_slug)} IS NOT NULL + AND ds.{self.timestamp_column} BETWEEN "{experiment.start_date.strftime('%Y-%m-%d')}" AND "{(experiment.end_date or datetime.now()).strftime('%Y-%m-%d')}" + AND {self.experiments_column_expr.format(experiment_slug=experiment.normandy_slug)} IS NOT NULL GROUP BY client_id, branch ORDER BY event_timestamp""" # noqa @@ -279,20 +279,19 @@ def build_union_query( def build_statistics_query( self, - comparison_branches: list[str], - reference_branch: str, + experiment: Experiment, metric: "InflightMetric", - start_date: str, - end_date: str, - experiment_slug: str, from_expr_dataset: str | None = None, minimum_width_observations: int = 1000, alpha: float = 0.05, full_sample: bool = False, ) -> str: - record_query = self.build_record_query( - metric, start_date, end_date, experiment_slug, from_expr_dataset - ) + comparison_branches = [ + branch.slug + for branch in experiment.branches + if branch.slug != experiment.reference_branch + ] + record_query = self.build_record_query(metric, experiment, from_expr_dataset) query = dedent( f"""WITH records AS ({record_query} )""" @@ -302,7 +301,7 @@ def build_statistics_query( comparison_branch_name = self.sanitize_branch_name(comparison_branch) subquery = self.build_statistics_query_one_branch( comparison_branch, - reference_branch, + experiment.reference_branch, metric.name, minimum_width_observations, alpha, @@ -350,23 +349,15 @@ class InflightMetric: def render_inflight_query( self, - start_date: str, - end_date: str, - comparison_branches: list[str], - reference_branch: str, - experiment_slug: str, + experiment: Experiment, from_expr_dataset: str | None = None, minimum_width_observations: int = 100, alpha: float = 0.05, full_sample: bool = False, ) -> str: return self.data_source.build_statistics_query( - comparison_branches, - reference_branch, + experiment, self, - start_date, - end_date, - experiment_slug, from_expr_dataset=from_expr_dataset, minimum_width_observations=minimum_width_observations, alpha=alpha, From 28f05966a6bcc4a89fb805c37cf63b13346083ee Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 10:35:04 -0700 Subject: [PATCH 28/35] refactored into statistic object --- src/mozanalysis/bq.py | 6 + src/mozanalysis/inflight.py | 229 ++++++++++++++++++++++++------------ 2 files changed, 157 insertions(+), 78 deletions(-) diff --git a/src/mozanalysis/bq.py b/src/mozanalysis/bq.py index 6bf210c2..2fcc93d3 100644 --- a/src/mozanalysis/bq.py +++ b/src/mozanalysis/bq.py @@ -84,3 +84,9 @@ def run_query(self, sql, results_table=None, replace_tables=False): def fully_qualify_table_name(self, table_name): """Given a table name, return it fully qualified.""" return f"{self.project_id}.{self.dataset_id}.{table_name}" + + def create_view(self, view_name: str, sql: str) -> None: + view_id = self.fully_qualify_table_name(view_name) + view = bigquery.Table(view_id) + view.view_query = sql + self.client.create_table(view) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 0c2037ce..eee6e235 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -1,6 +1,8 @@ import attr +import re from mozanalysis.metrics import DataSource +from mozanalysis.bq import BigQueryContext, sanitize_table_name_for_bq from metric_config_parser.experiment import Experiment @@ -10,6 +12,8 @@ from scipy.special import lambertw from datetime import datetime +from abc import ABC + class ExperimentAnnotationMissingError(Exception): pass @@ -52,7 +56,7 @@ def experiments_column_expr(self) -> str: else: raise ValueError - def build_record_query( + def render_records_query( self, metric: "InflightMetric", experiment: Experiment, @@ -80,7 +84,69 @@ def build_record_query( return query - def build_statistics_query_piece_prep( + +@attr.s(frozen=True, slots=True) +class InflightMetric: + name = attr.ib(type=str) + select_expr = attr.ib(type=str) + data_source = attr.ib(type=InflightDataSource) + friendly_name = attr.ib(type=str | None, default=None) + description = attr.ib(type=str | None, default=None) + app_name = attr.ib(type=str | None, default=None) + + def render_records_query( + self, experiment: Experiment, from_expr_dataset: str | None = None + ) -> str: + return self.data_source.render_records_query( + self, experiment, from_expr_dataset + ) + + def record_view_name(self, experiment: Experiment) -> str: + bq_experiment_slug = sanitize_table_name_for_bq(experiment.normandy_slug) + metric_slug = sanitize_table_name_for_bq(self.name) + view_name = f"records_{bq_experiment_slug}_{metric_slug}" + return view_name + + def publish_records_view( + self, + context: BigQueryContext, + experiment: Experiment, + from_expr_dataset: str | None = None, + ) -> None: + view_name = self.record_view_name(experiment) + view_sql = self.render_records_query(experiment, from_expr_dataset) + + context.create_view(view_name, view_sql) + + +@attr.s() +class InflightStatistic(ABC): + + alpha: float = 0.05 + + def render_statistics_query( + self, + experiment: Experiment, + metric: "InflightMetric", + **statistics_kwargs, + ) -> str: + raise NotImplementedError + + def publish_statistics_view( + self, + experiment: Experiment, + metric: InflightMetric, + context: BigQueryContext, + **statistical_kwargs, + ) -> None: + raise NotImplementedError + + +@attr.s() +class DesignBasedConfidenceSequences(InflightStatistic): + minimum_width_observations: int = 100 + + def render_statistics_query_piece_prep( self, comparison_branch: str, reference_branch: str, metric_name: str ) -> str: """ @@ -89,7 +155,7 @@ def build_statistics_query_piece_prep( Filters to clients from the `reference_branch` or `comparison_branch`, constructs treatment indicators, a `Y_i` column, and a rank column. - Assumes an upstream CTE holding the output of `build_record_query` + Assumes an upstream CTE holding the output of `render_record_query` named `records`. """ query = f"""SELECT @@ -103,9 +169,9 @@ def build_statistics_query_piece_prep( return query - def build_statistics_query_piece_sufficient_statistics(self) -> str: + def render_statistics_query_piece_sufficient_statistics(self) -> str: """ - Builds upon `build_statistics_query_piece_intro` to add the sufficient statistics + Builds upon `render_statistics_query_piece_intro` to add the sufficient statistics `tau_hat_i` and `sigma_hat_sq_i` necessary to calculate the confidence sequence. Adds: @@ -114,7 +180,7 @@ def build_statistics_query_piece_sufficient_statistics(self) -> str: - `sigma_hat_sq_i`: either +1/4*(metric value)^2 (in case of comparison branch) or -1/4*(metric value)^2 (in case of reference branch). - Assumes an upstream CTE holding the output of `build_statistics_query_piece_prep` + Assumes an upstream CTE holding the output of `render_statistics_query_piece_prep` named `prep`. """ @@ -126,9 +192,9 @@ def build_statistics_query_piece_sufficient_statistics(self) -> str: return query - def build_statistics_query_piece_accumulators(self) -> str: + def render_statistics_query_piece_accumulators(self) -> str: """ - Builds upon `build_statistics_query_piece_sufficient_statistics` to construct + Builds upon `render_statistics_query_piece_sufficient_statistics` to construct expanding sufficient statistics (accumulate the sufficient statistics over time). Adds: @@ -138,7 +204,7 @@ def build_statistics_query_piece_accumulators(self) -> str: this time point. Known as S_n in the literature. Assumes an upstream CTE holding the output of - `build_statistics_query_piece_sufficient_statistics` named `sufficient_statistics`. + `render_statistics_query_piece_sufficient_statistics` named `sufficient_statistics`. """ query = """SELECT @@ -150,19 +216,17 @@ def build_statistics_query_piece_accumulators(self) -> str: return query - def build_statistics_query_piece_ci_terms( - self, minimum_width_observations: int = 100, alpha: float = 0.05 - ) -> str: + def render_statistics_query_piece_ci_terms(self) -> str: """ - Builds upon `build_statistics_query_piece_accumulators` to construct + Builds upon `render_statistics_query_piece_accumulators` to construct the two terms needed to calculate the width of the confidence sequence. Assumes an upstream CTE holding the output of - `build_statistics_query_piece_accumulators` named `accumulators`. + `render_statistics_query_piece_accumulators` named `accumulators`. """ - eta_sq = self.eta(minimum_width_observations, alpha) ** 2 - alpha_sq = alpha**2 + eta_sq = self.eta**2 + alpha_sq = self.alpha**2 query = f"""SELECT *, @@ -172,16 +236,16 @@ def build_statistics_query_piece_ci_terms( return query - def build_statistics_query_piece_ci_width(self) -> str: + def render_statistics_query_piece_ci_width(self) -> str: """ - Builds upon `build_statistics_query_piece_accumulators` to construct + Builds upon `render_statistics_query_piece_accumulators` to construct the two terms needed to calculate the width of the confidence sequence. Adds: - ci_width: the width of the confidence sequence at this time. Assumes an upstream CTE holding the output of - `build_statistics_query_piece_ci_terms` named `ci_terms`. + `render_statistics_query_piece_ci_terms` named `ci_terms`. """ query = """SELECT @@ -191,12 +255,12 @@ def build_statistics_query_piece_ci_width(self) -> str: return query - def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: + def render_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: """ - Cleans up the output of `build_statistics_query_piece_ci_width`. + Cleans up the output of `render_statistics_query_piece_ci_width`. Assumes an upstream CTE holding the output of - `build_statistics_query_piece_ci_width` named `ci_width_term` + `render_statistics_query_piece_ci_width` named `ci_width_term` """ query = f"""SELECT @@ -210,13 +274,11 @@ def build_statistics_query_piece_cleanup(self, comparison_branch: str) -> str: return query - def build_statistics_query_one_branch( + def render_statistics_query_one_branch( self, comparison_branch: str, reference_branch: str, metric_name: str, - minimum_width_observations: int = 100, - alpha: float = 0.05, ) -> str: """ Builds the statistical query to construct the confidence sequence to compare @@ -225,17 +287,17 @@ def build_statistics_query_one_branch( query = f""" WITH prep AS ( - {self.build_statistics_query_piece_prep(comparison_branch, reference_branch, metric_name)} + {self.render_statistics_query_piece_prep(comparison_branch, reference_branch, metric_name)} ), sufficient_statistics AS ( - {self.build_statistics_query_piece_sufficient_statistics()} + {self.render_statistics_query_piece_sufficient_statistics()} ), accumulators AS ( - {self.build_statistics_query_piece_accumulators()} + {self.render_statistics_query_piece_accumulators()} ), ci_terms AS ( - {self.build_statistics_query_piece_ci_terms(minimum_width_observations, alpha)} + {self.render_statistics_query_piece_ci_terms()} ), ci_width_term AS ( - {self.build_statistics_query_piece_ci_width()} + {self.render_statistics_query_piece_ci_width()} ), ci_cleanup AS ( - {self.build_statistics_query_piece_cleanup(comparison_branch)} + {self.render_statistics_query_piece_cleanup(comparison_branch)} ) SELECT * FROM ci_cleanup @@ -243,11 +305,11 @@ def build_statistics_query_one_branch( return query - def build_union_query( + def render_union_query( self, comparison_branches: list[str], full_sample: bool = False ) -> str: clean_comparison_branches = [ - self.sanitize_branch_name(branch) for branch in comparison_branches + sanitize_table_name_for_bq(branch) for branch in comparison_branches ] branch_timestamps = ", ".join( [f"{branch}.event_timestamp" for branch in clean_comparison_branches] @@ -277,44 +339,65 @@ def build_union_query( return query - def build_statistics_query( + def render_statistics_query( self, experiment: Experiment, - metric: "InflightMetric", - from_expr_dataset: str | None = None, - minimum_width_observations: int = 1000, - alpha: float = 0.05, + metric: InflightMetric, full_sample: bool = False, + **ignored_kwargs, ) -> str: + + metric_view = metric.record_view_name(experiment) + comparison_branches = [ branch.slug for branch in experiment.branches if branch.slug != experiment.reference_branch ] - record_query = self.build_record_query(metric, experiment, from_expr_dataset) + query = dedent( - f"""WITH records AS ({record_query} + f"""WITH records AS (SELECT * FROM {metric_view} )""" ) for comparison_branch in comparison_branches: - comparison_branch_name = self.sanitize_branch_name(comparison_branch) - subquery = self.build_statistics_query_one_branch( + comparison_branch_name = sanitize_table_name_for_bq(comparison_branch) + subquery = self.render_statistics_query_one_branch( comparison_branch, experiment.reference_branch, metric.name, - minimum_width_observations, - alpha, ) query += f""", {comparison_branch_name} AS ({subquery})""" query += "\n" - query += self.build_union_query(comparison_branches, full_sample) + query += self.render_union_query(comparison_branches, full_sample) return query - @staticmethod - def eta(minimum_width_observations: int = 100, alpha: float = 0.05) -> float: + def statistics_view_name( + self, experiment: Experiment, metric: InflightMetric + ) -> str: + bq_experiment_slug = sanitize_table_name_for_bq(experiment.normandy_slug) + metric_slug = sanitize_table_name_for_bq(metric.name) + statistics_slug = self.name() + view_name = f"statistics_{bq_experiment_slug}_{metric_slug}_{statistics_slug}" + return view_name + + def publish_statistics_view( + self, + experiment: Experiment, + metric: InflightMetric, + context: BigQueryContext, + full_sample: bool = False, + **ignored_runtime_statistical_kwargs, + ) -> None: + view_name = self.statistics_view_name(experiment, metric) + view_sql = self.render_statistics_query(experiment, metric, full_sample) + + context.create_view(view_name, view_sql) + + @property + def eta(self) -> float: """ Returns the `eta` (tuning parameter) that minimizes the relative width of the confidence sequence after `minimum_width_observations` clients enrolled. Note @@ -325,41 +408,31 @@ def eta(minimum_width_observations: int = 100, alpha: float = 0.05) -> float: We default to 100 to focus the "alpha spending" near the start of the experiment. """ - alpha_sq = alpha**2 + alpha_sq = self.alpha**2 eta = np.sqrt( (-1 * lambertw(-1 * alpha_sq * np.exp(1), -1) - 1) - / minimum_width_observations + / self.minimum_width_observations ).real assert np.isfinite(eta) return eta - @staticmethod - def sanitize_branch_name(branch: str) -> str: - return branch.replace("-", "_") - - -@attr.s(frozen=True, slots=True) -class InflightMetric: - name = attr.ib(type=str) - select_expr = attr.ib(type=str) - data_source = attr.ib(type=InflightDataSource) - friendly_name = attr.ib(type=str | None, default=None) - description = attr.ib(type=str | None, default=None) - app_name = attr.ib(type=str | None, default=None) - - def render_inflight_query( - self, - experiment: Experiment, - from_expr_dataset: str | None = None, - minimum_width_observations: int = 100, - alpha: float = 0.05, - full_sample: bool = False, - ) -> str: - return self.data_source.build_statistics_query( - experiment, - self, - from_expr_dataset=from_expr_dataset, - minimum_width_observations=minimum_width_observations, - alpha=alpha, - full_sample=full_sample, + @classmethod + def name(cls): + """Return snake-cased name of the statistic.""" + # https://stackoverflow.com/a/1176023 + name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", cls.__name__) + return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() + + +class InflightSummary: + metric: InflightMetric + statistic: InflightStatistic + experiment: Experiment + + def publish_views( + self, context: BigQueryContext, **runtime_statistical_kwargs + ) -> None: + self.metric.publish_records_view(context, self.experiment) + self.statistic.publish_statistics_view( + self.experiment, self.metric, context, **runtime_statistical_kwargs ) From 4ebe516a85d4b16403ace7fc6e83705ed015fb51 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 11:02:49 -0700 Subject: [PATCH 29/35] move to attrs --- src/mozanalysis/inflight.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index eee6e235..d0a7c883 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -121,8 +121,7 @@ def publish_records_view( @attr.s() class InflightStatistic(ABC): - - alpha: float = 0.05 + alpha = attr.ib(type=float, default=0.05) def render_statistics_query( self, @@ -144,7 +143,7 @@ def publish_statistics_view( @attr.s() class DesignBasedConfidenceSequences(InflightStatistic): - minimum_width_observations: int = 100 + minimum_width_observations = attr.ib(type=int, default=100) def render_statistics_query_piece_prep( self, comparison_branch: str, reference_branch: str, metric_name: str @@ -424,10 +423,11 @@ def name(cls): return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() +@attr.s() class InflightSummary: - metric: InflightMetric - statistic: InflightStatistic - experiment: Experiment + metric = attr.ib(type=InflightMetric) + statistic = attr.ib(type=InflightStatistic) + experiment = attr.ib(type=Experiment) def publish_views( self, context: BigQueryContext, **runtime_statistical_kwargs From 7bc77f37d802c5159c6ac422217792eded8a9a11 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 11:11:37 -0700 Subject: [PATCH 30/35] move to attrs --- src/mozanalysis/inflight.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index d0a7c883..c47d5761 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -390,7 +390,9 @@ def publish_statistics_view( full_sample: bool = False, **ignored_runtime_statistical_kwargs, ) -> None: - view_name = self.statistics_view_name(experiment, metric) + view_name = context.fully_qualify_table_name( + self.statistics_view_name(experiment, metric) + ) view_sql = self.render_statistics_query(experiment, metric, full_sample) context.create_view(view_name, view_sql) From 4734ab6bd3314627ecbf98d7839a124e98b183f5 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 11:18:11 -0700 Subject: [PATCH 31/35] overwrite existing views --- src/mozanalysis/inflight.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index c47d5761..12df1cd5 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -116,7 +116,7 @@ def publish_records_view( view_name = self.record_view_name(experiment) view_sql = self.render_records_query(experiment, from_expr_dataset) - context.create_view(view_name, view_sql) + context.create_view(view_name, view_sql, replace_view=True) @attr.s() @@ -395,7 +395,7 @@ def publish_statistics_view( ) view_sql = self.render_statistics_query(experiment, metric, full_sample) - context.create_view(view_name, view_sql) + context.create_view(view_name, view_sql, replace_view=True) @property def eta(self) -> float: From 0ae459bc21e56983b76cae89c1324ceb4945d021 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 11:21:44 -0700 Subject: [PATCH 32/35] forgot bq file --- src/mozanalysis/bq.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mozanalysis/bq.py b/src/mozanalysis/bq.py index 2fcc93d3..dd8099b0 100644 --- a/src/mozanalysis/bq.py +++ b/src/mozanalysis/bq.py @@ -85,8 +85,10 @@ def fully_qualify_table_name(self, table_name): """Given a table name, return it fully qualified.""" return f"{self.project_id}.{self.dataset_id}.{table_name}" - def create_view(self, view_name: str, sql: str) -> None: + def create_view(self, view_name: str, sql: str, replace_view=False) -> None: view_id = self.fully_qualify_table_name(view_name) view = bigquery.Table(view_id) + if replace_view: + self.client.delete_table(view, not_found_ok=True) view.view_query = sql self.client.create_table(view) From aa760a9931a40ae1b04e2787f5d4bf20b04b4433 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 11:29:19 -0700 Subject: [PATCH 33/35] updates --- src/mozanalysis/inflight.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 12df1cd5..fae35fa9 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -390,9 +390,7 @@ def publish_statistics_view( full_sample: bool = False, **ignored_runtime_statistical_kwargs, ) -> None: - view_name = context.fully_qualify_table_name( - self.statistics_view_name(experiment, metric) - ) + view_name = self.statistics_view_name(experiment, metric) view_sql = self.render_statistics_query(experiment, metric, full_sample) context.create_view(view_name, view_sql, replace_view=True) From 82f234a0b2085a39e9f173a099673cf23c8c14a5 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Tue, 23 Jul 2024 11:34:39 -0700 Subject: [PATCH 34/35] updates --- src/mozanalysis/inflight.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index fae35fa9..9c9e2ec2 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -342,11 +342,14 @@ def render_statistics_query( self, experiment: Experiment, metric: InflightMetric, + context: BigQueryContext, full_sample: bool = False, **ignored_kwargs, ) -> str: - metric_view = metric.record_view_name(experiment) + metric_view = context.fully_qualify_table_name( + metric.record_view_name(experiment) + ) comparison_branches = [ branch.slug @@ -391,7 +394,9 @@ def publish_statistics_view( **ignored_runtime_statistical_kwargs, ) -> None: view_name = self.statistics_view_name(experiment, metric) - view_sql = self.render_statistics_query(experiment, metric, full_sample) + view_sql = self.render_statistics_query( + experiment, metric, context, full_sample + ) context.create_view(view_name, view_sql, replace_view=True) From bf8fd06bd54b803631c515670e2356267062a4e2 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Wed, 24 Jul 2024 15:14:41 -0700 Subject: [PATCH 35/35] updated with ability to construct from mcp types --- src/mozanalysis/inflight.py | 46 ++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/src/mozanalysis/inflight.py b/src/mozanalysis/inflight.py index 9c9e2ec2..ef588f82 100644 --- a/src/mozanalysis/inflight.py +++ b/src/mozanalysis/inflight.py @@ -5,6 +5,7 @@ from mozanalysis.bq import BigQueryContext, sanitize_table_name_for_bq from metric_config_parser.experiment import Experiment +import metric_config_parser.metric as parser_metric # import Summary, Metric, DataSource from textwrap import dedent @@ -84,6 +85,17 @@ def render_records_query( return query + @classmethod + def from_data_source( + cls, data_source: parser_metric.DataSource + ) -> "InflightDataSource": + return cls( + name=data_source.name, + from_expr=data_source.from_expression, + experiments_column_type=data_source.experiments_column_type, + timestamp_column=data_source.timestamp_column, + ) + @attr.s(frozen=True, slots=True) class InflightMetric: @@ -92,7 +104,6 @@ class InflightMetric: data_source = attr.ib(type=InflightDataSource) friendly_name = attr.ib(type=str | None, default=None) description = attr.ib(type=str | None, default=None) - app_name = attr.ib(type=str | None, default=None) def render_records_query( self, experiment: Experiment, from_expr_dataset: str | None = None @@ -117,6 +128,18 @@ def publish_records_view( view_sql = self.render_records_query(experiment, from_expr_dataset) context.create_view(view_name, view_sql, replace_view=True) + print(f"published records view for {self.name} to {view_name}") + + @classmethod + def from_metric(cls, metric: parser_metric.Metric) -> "InflightMetric": + data_source = InflightDataSource.from_data_source(metric.data_source) + return cls( + name=metric.name, + select_expr=metric.select_expression, + data_source=data_source, + friendly_name=metric.friendly_name, + description=metric.description, + ) @attr.s() @@ -400,6 +423,8 @@ def publish_statistics_view( context.create_view(view_name, view_sql, replace_view=True) + print(f"published statistics view for {metric.name} to {view_name}") + @property def eta(self) -> float: """ @@ -441,3 +466,22 @@ def publish_views( self.statistic.publish_statistics_view( self.experiment, self.metric, context, **runtime_statistical_kwargs ) + + @classmethod + def from_summary( + cls, summary: parser_metric.Summary, experiment: Experiment + ) -> "InflightSummary": + found = False + for statistic_class in set(InflightStatistic.__subclasses__()): + if statistic_class.name() == summary.statistic.name: + found = True + break + + if not found: + raise ValueError(f"Statistic '{summary.statistic.name}' does not exist.") + + statistic = statistic_class(**summary.statistic.params) + + metric = InflightMetric.from_metric(summary.metric) + + return cls(metric=metric, statistic=statistic, experiment=experiment)