diff --git a/matrix_benchmarking/common.py b/matrix_benchmarking/common.py index 7e1b4eb1..3af302a6 100644 --- a/matrix_benchmarking/common.py +++ b/matrix_benchmarking/common.py @@ -24,7 +24,8 @@ def __init__(self, location, results, self.results = results self.settings.__dict__.update(processed_settings) - + + self.import_key = import_key self.processed_key = processed_key self.import_settings = processed_settings diff --git a/matrix_benchmarking/download_lts.py b/matrix_benchmarking/download_lts.py index f72aec51..4f1483e7 100644 --- a/matrix_benchmarking/download_lts.py +++ b/matrix_benchmarking/download_lts.py @@ -38,7 +38,8 @@ def main(opensearch_host: str = "", opensearch_index: the OpenSearch index where the LTS payloads are stored (Mandatory) lts_results_dirname: The directory to place the downloaded LTS results files. - filters: If provided, only download the experiments matching the filters. Eg: {"image_name": "1.2"}. (Optional.) + filters: If provided, only download the experiments matching the filters. Eg: {"image_name": "1.2"}. + If the provided value is "*", then we just check to ensure the keys existence (Optional.) max_records: Maximum number of records to retrieve from the OpenSearch instance. 10,000 is the largest number possible without paging (Optional.) force: Ignore the presence of the anchor file before downloading the results (Optional.) clean: Delete all the existing '.json' files in the lts-results-dirname before downloading the results (Optional.) @@ -121,7 +122,7 @@ def download(client, opensearch_index, filters, lts_results_dirname, max_records query["query"] = { "bool": { "must": [ - {"term": {f"{k}.keyword": v}} for k, v in filters.items() + {"term": {f"{k}.keyword": v}} if v != "*" else {"exists": {"field": k}} for k, v in filters.items() ] } } diff --git a/matrix_benchmarking/models.py b/matrix_benchmarking/models.py index 88a81c0c..a138cf2f 100644 --- a/matrix_benchmarking/models.py +++ b/matrix_benchmarking/models.py @@ -130,3 +130,12 @@ def tostr(self): model.tostr = tostr return model + +class Regression(ExclusiveModel): + kpi: str + metric: str + indicator: str + status: int + direction: Optional[int] = Field(default=None) + explanation: Optional[str] = Field(default=None) + details: Optional[dict[str, str]] = Field(default=None) diff --git a/matrix_benchmarking/regression/__init__.py b/matrix_benchmarking/regression/__init__.py new file mode 100644 index 00000000..7d335df7 --- /dev/null +++ b/matrix_benchmarking/regression/__init__.py @@ -0,0 +1,142 @@ +import os +import json +import logging +import types +import datetime +import numpy as np +from functools import reduce +from typing import Optional, Callable +import copy + +import matrix_benchmarking.common as common +import matrix_benchmarking.models as models + +def get_from_path(d, path): + return reduce(dict.get, path.split("."), d) + +# check if ALL (k, v) pairs in part are present in full_dict +def dict_part_eq(part, full_dict): + return reduce(lambda x, y: x and part[y] == full_dict[y], part.keys(), True) + +class RegressionStatus(types.SimpleNamespace): + def __init__( + self, + status: int, + direction: Optional[int] = None, + explanation: Optional[str] = None, + details: Optional[dict] = None + ): + self.status = status + self.direction = direction + self.explanation = explanation + self.details = details + +class RegressionIndicator: + """ + Assume the matrix that is passed in contains a prefiltered combination of settings, + or pass in the desired filter with the setings_filter option + """ + def __init__( + self, + new_payload: common.MatrixEntry, + lts_payloads: list[common.MatrixEntry], + x_var: str, + x_var_key = lambda x: x.metadata.end.astimezone(), + kpis: Optional[list[str]] = None, + settings_filter: Optional[dict] = None, + combine_funcs: dict = {}, + use_x_var = False # Automatically determine the settings for the x_var + ): + self.new_payload = new_payload + self.x_var = x_var + self.x_var_key = x_var_key + self.kpis = kpis + self.combine_funcs = combine_funcs + self.settings_filter = settings_filter + self.use_x_var = use_x_var + + if self.settings_filter and self.x_var: + logging.warning("settings_filter and x_var set, only using settings_filter") + elif self.x_var and self.use_x_var: + settings = dict(self.new_payload.get_settings()) + settings.pop(self.x_var) + self.settings_filter = settings + + if self.settings_filter: + # Only store payloads that have equivalent (k, v) pairs + # as the settings_filter + self.lts_payloads = list( + filter( + lambda x: dict_part_eq(self.settings_filter, x.get_settings()), + lts_payloads + ) + ) + + if not dict_part_eq(self.settings_filter, self.new_payload.get_settings()): + self.new_payload = None + logging.warning("settings_filter isn't satisfied for the new payload") + else: + self.lts_payloads = lts_payloads + + + def get_name(self): + return "UndefinedRegressionIndicator" + + def analyze(self) -> list[models.Regression]: + + if not self.new_payload or not self.lts_payloads: + logging.info("Missing a new payload or lts payloads") + return [ + models.Regression( + kpi="", + metric="" if not self.x_var else self.x_var, + indicator=self.get_name(), + status=0 + ) + ] + + regression_results = [] + + kpis_to_test = vars(self.new_payload.results.lts.kpis).keys() if not self.kpis else self.kpis + for kpi in kpis_to_test: + + curr_values = [] + if type(self.new_payload.results) is list: + for result in self.new_payload.results: + curr_values.append(vars(result.lts.kpis)[kpi].value) + else: + curr_values.append(vars(self.new_payload.results.lts.kpis)[kpi].value) + + lts_values = [] + for payload in self.lts_payloads: + if type(payload.results) is list: + lts_values += list(map(lambda x: vars(x.results.kpis)[kpi].value, payload.results)) + else: + lts_values.append(vars(payload.results.kpis)[kpi].value) + + + if any(map(lambda x: type(x) is list, curr_values + lts_values)): + if kpi in self.combine_funcs: + curr_values = [self.combine_funcs[kpi](v) for v in curr_values] + lts_values = [self.combine_funcs[kpi](v) for v in lts_values] + else: + logging.warning(f"Skipping KPI with list of values, consider filtering KPIs or providing a combine_func for {kpi}") + continue + + for curr_value in curr_values: + raw_results: RegressionStatus = self.regression_test(curr_value, lts_values) + result = models.Regression( + kpi=kpi, + metric="" if not self.x_var else self.x_var, + indicator=self.get_name(), + status=raw_results.status, + direction=raw_results.direction, + explanation=raw_results.explanation, + details=raw_results.details + ) + regression_results.append(result) + + return regression_results + + def regression_test(self, new_result: float, lts_result: np.array) -> RegressionStatus: + return RegressionStatus(0, explanation="Default return status") diff --git a/matrix_benchmarking/regression/zscore.py b/matrix_benchmarking/regression/zscore.py new file mode 100644 index 00000000..60c2da99 --- /dev/null +++ b/matrix_benchmarking/regression/zscore.py @@ -0,0 +1,41 @@ +from matrix_benchmarking import regression + +import numpy as np + +class ZScoreIndicator(regression.RegressionIndicator): + """ + Example regression indicator that uses the Z score as a metric + to determine if the recent test was an outlier + """ + def __init__(self, *args, threshold=3, **kwargs): + super().__init__(*args, **kwargs) + self.threshold = threshold + + def get_name(self): + return f"ZScoreIndicator(threshold={self.threshold})" + + def regression_test(self, new_result: float, lts_results: np.array) -> regression.RegressionStatus: + """ + Determine if the curr_result is more/less than threshold + standard deviations away from the previous_results + """ + mean = np.mean(lts_results) + std = np.std(lts_results) + z_score = (new_result - mean) / std + + status = 0 + direction = 0 + explanation = "z-score not greater than threshold" + details = { + "new_result": new_result, + "threshold": self.threshold, + "zscore": z_score, + "mean": mean, + "stddev": std + } + if abs(z_score) > self.threshold: + status = 1 + direction = 1 if z_score > 0 else -1 + explanation="z-score greater than threshold" + + return regression.RegressionStatus(status, direction=direction, explanation=explanation, details=details) diff --git a/matrix_benchmarking/store/simple.py b/matrix_benchmarking/store/simple.py index afe3d591..aec9b0f9 100644 --- a/matrix_benchmarking/store/simple.py +++ b/matrix_benchmarking/store/simple.py @@ -185,6 +185,7 @@ def has_lts_anchor(files): with open(filepath) as f: document = json.load(f) + try: lts_payload = store.lts_schema.parse_obj(document) except pydantic.error_wrappers.ValidationError as e: