Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General Regression Analysis Wrapper #125

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion matrix_benchmarking/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def __init__(self, location, results,
self.results = results

self.settings.__dict__.update(processed_settings)


self.import_key = import_key
self.processed_key = processed_key
self.import_settings = processed_settings

Expand Down
5 changes: 3 additions & 2 deletions matrix_benchmarking/download_lts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def main(opensearch_host: str = "",
opensearch_index: the OpenSearch index where the LTS payloads are stored (Mandatory)

lts_results_dirname: The directory to place the downloaded LTS results files.
filters: If provided, only download the experiments matching the filters. Eg: {"image_name": "1.2"}. (Optional.)
filters: If provided, only download the experiments matching the filters. Eg: {"image_name": "1.2"}.
If the provided value is "*", then we just check to ensure the keys existence (Optional.)
max_records: Maximum number of records to retrieve from the OpenSearch instance. 10,000 is the largest number possible without paging (Optional.)
force: Ignore the presence of the anchor file before downloading the results (Optional.)
clean: Delete all the existing '.json' files in the lts-results-dirname before downloading the results (Optional.)
Expand Down Expand Up @@ -121,7 +122,7 @@ def download(client, opensearch_index, filters, lts_results_dirname, max_records
query["query"] = {
"bool": {
"must": [
{"term": {f"{k}.keyword": v}} for k, v in filters.items()
{"term": {f"{k}.keyword": v}} if v != "*" else {"exists": {"field": k}} for k, v in filters.items()
]
}
}
Expand Down
9 changes: 9 additions & 0 deletions matrix_benchmarking/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,12 @@ def tostr(self):
model.tostr = tostr

return model

class Regression(ExclusiveModel):
kpi: str
metric: str
indicator: str
status: int
direction: Optional[int] = Field(default=None)
explanation: Optional[str] = Field(default=None)
details: Optional[dict[str, str]] = Field(default=None)
142 changes: 142 additions & 0 deletions matrix_benchmarking/regression/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
import json
import logging
import types
import datetime
import numpy as np
from functools import reduce
from typing import Optional, Callable
import copy

import matrix_benchmarking.common as common
import matrix_benchmarking.models as models

def get_from_path(d, path):
return reduce(dict.get, path.split("."), d)

# check if ALL (k, v) pairs in part are present in full_dict
def dict_part_eq(part, full_dict):
return reduce(lambda x, y: x and part[y] == full_dict[y], part.keys(), True)

class RegressionStatus(types.SimpleNamespace):
def __init__(
self,
status: int,
direction: Optional[int] = None,
explanation: Optional[str] = None,
details: Optional[dict] = None
):
self.status = status
self.direction = direction
self.explanation = explanation
self.details = details

class RegressionIndicator:
"""
Assume the matrix that is passed in contains a prefiltered combination of settings,
or pass in the desired filter with the setings_filter option
"""
def __init__(
self,
new_payload: common.MatrixEntry,
lts_payloads: list[common.MatrixEntry],
x_var: str,
x_var_key = lambda x: x.metadata.end.astimezone(),
kpis: Optional[list[str]] = None,
settings_filter: Optional[dict] = None,
combine_funcs: dict = {},
use_x_var = False # Automatically determine the settings for the x_var
):
self.new_payload = new_payload
self.x_var = x_var
self.x_var_key = x_var_key
self.kpis = kpis
self.combine_funcs = combine_funcs
self.settings_filter = settings_filter
self.use_x_var = use_x_var

if self.settings_filter and self.x_var:
logging.warning("settings_filter and x_var set, only using settings_filter")
elif self.x_var and self.use_x_var:
settings = dict(self.new_payload.get_settings())
settings.pop(self.x_var)
self.settings_filter = settings

if self.settings_filter:
# Only store payloads that have equivalent (k, v) pairs
# as the settings_filter
self.lts_payloads = list(
filter(
lambda x: dict_part_eq(self.settings_filter, x.get_settings()),
lts_payloads
)
)

if not dict_part_eq(self.settings_filter, self.new_payload.get_settings()):
self.new_payload = None
logging.warning("settings_filter isn't satisfied for the new payload")
else:
self.lts_payloads = lts_payloads


def get_name(self):
return "UndefinedRegressionIndicator"

def analyze(self) -> list[models.Regression]:

if not self.new_payload or not self.lts_payloads:
logging.info("Missing a new payload or lts payloads")
return [
models.Regression(
kpi="",
metric="" if not self.x_var else self.x_var,
indicator=self.get_name(),
status=0
)
]

regression_results = []

kpis_to_test = vars(self.new_payload.results.lts.kpis).keys() if not self.kpis else self.kpis
for kpi in kpis_to_test:

curr_values = []
if type(self.new_payload.results) is list:
for result in self.new_payload.results:
curr_values.append(vars(result.lts.kpis)[kpi].value)
else:
curr_values.append(vars(self.new_payload.results.lts.kpis)[kpi].value)

lts_values = []
for payload in self.lts_payloads:
if type(payload.results) is list:
lts_values += list(map(lambda x: vars(x.results.kpis)[kpi].value, payload.results))
else:
lts_values.append(vars(payload.results.kpis)[kpi].value)


if any(map(lambda x: type(x) is list, curr_values + lts_values)):
if kpi in self.combine_funcs:
curr_values = [self.combine_funcs[kpi](v) for v in curr_values]
lts_values = [self.combine_funcs[kpi](v) for v in lts_values]
else:
logging.warning(f"Skipping KPI with list of values, consider filtering KPIs or providing a combine_func for {kpi}")
continue

for curr_value in curr_values:
raw_results: RegressionStatus = self.regression_test(curr_value, lts_values)
result = models.Regression(
kpi=kpi,
metric="" if not self.x_var else self.x_var,
indicator=self.get_name(),
status=raw_results.status,
direction=raw_results.direction,
explanation=raw_results.explanation,
details=raw_results.details
)
regression_results.append(result)

return regression_results

def regression_test(self, new_result: float, lts_result: np.array) -> RegressionStatus:
return RegressionStatus(0, explanation="Default return status")
41 changes: 41 additions & 0 deletions matrix_benchmarking/regression/zscore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from matrix_benchmarking import regression

import numpy as np

class ZScoreIndicator(regression.RegressionIndicator):
"""
Example regression indicator that uses the Z score as a metric
to determine if the recent test was an outlier
"""
def __init__(self, *args, threshold=3, **kwargs):
super().__init__(*args, **kwargs)
self.threshold = threshold

def get_name(self):
return f"ZScoreIndicator(threshold={self.threshold})"

def regression_test(self, new_result: float, lts_results: np.array) -> regression.RegressionStatus:
"""
Determine if the curr_result is more/less than threshold
standard deviations away from the previous_results
"""
mean = np.mean(lts_results)
std = np.std(lts_results)
z_score = (new_result - mean) / std

status = 0
direction = 0
explanation = "z-score not greater than threshold"
details = {
"new_result": new_result,
"threshold": self.threshold,
"zscore": z_score,
"mean": mean,
"stddev": std
}
if abs(z_score) > self.threshold:
status = 1
direction = 1 if z_score > 0 else -1
explanation="z-score greater than threshold"

return regression.RegressionStatus(status, direction=direction, explanation=explanation, details=details)
1 change: 1 addition & 0 deletions matrix_benchmarking/store/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def has_lts_anchor(files):
with open(filepath) as f:
document = json.load(f)


try:
lts_payload = store.lts_schema.parse_obj(document)
except pydantic.error_wrappers.ValidationError as e:
Expand Down