Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General Regression Analysis Framework #121

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions matrix_benchmarking/download_lts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def main(opensearch_host: str = "",
opensearch_index: the OpenSearch index where the LTS payloads are stored (Mandatory)

lts_results_dirname: The directory to place the downloaded LTS results files.
filters: If provided, only download the experiments matching the filters. Eg: {"image_name": "1.2"}. (Optional.)
filters: If provided, only download the experiments matching the filters. Eg: {"image_name": "1.2"}.
If the provided value is "*", then we just check to ensure the keys existence (Optional.)
max_records: Maximum number of records to retrieve from the OpenSearch instance. 10,000 is the largest number possible without paging (Optional.)
force: Ignore the presence of the anchor file before downloading the results (Optional.)
clean: Delete all the existing '.json' files in the lts-results-dirname before downloading the results (Optional.)
Expand Down Expand Up @@ -119,7 +120,7 @@ def download(client, opensearch_index, filters, lts_results_dirname, max_records
query["query"] = {
"bool": {
"must": [
{"term": {f"{k}.keyword": v}} for k, v in filters.items()
{"term": {f"{k}.keyword": v}} if v != "*" else {"exists": {"field": k}} for k, v in filters.items()
]
}
}
Expand Down
176 changes: 176 additions & 0 deletions matrix_benchmarking/regression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import os
import json
import logging
import types
import datetime
import numpy as np
from functools import reduce
from typing import Optional, Callable

import matrix_benchmarking.common as common

def get_from_path(d, path):
return reduce(dict.get, path.split("."), d)

# check if ALL (k, v) pairs in part are present in full_dict
def dict_part_eq(part, full_dict):
return reduce(lambda x, y: x and part[y] == full_dict[y], part.keys(), True)

class RegressionStatus(types.SimpleNamespace):
def __init__(
self,
status: int,
direction: Optional[int] = None,
explanation: Optional[str] = None,
details: Optional[dict] = None
):
self.status = status
self.direction = direction
self.explanation = explanation
self.details = details


class RegressionIndicator:
"""
Assume the matrix that is passed in contains a prefiltered combination of settings,
or pass in the desired filter with the setings_filter option
"""
def __init__(
self,
new_payload: common.MatrixEntry,
lts_payloads: list[common.MatrixEntry],
x_var = None,
x_var_key = lambda x: x.results.metadata.end.astimezone(),
kpis: Optional[list[str]] = None,
settings_filter: Optional[dict] = None,
combine_funcs: dict = {},
):
self.new_payload = new_payload
self.x_var = x_var
self.x_var_key = x_var_key
self.kpis = kpis
self.combine_funcs = combine_funcs
self.settings_filter = settings_filter

if self.settings_filter and self.x_var:
logging.warning("settings_filter and x_var set, only using settings_filter")
elif self.x_var:
settings = self.new_payload.get_settings()
settings.pop(self.x_var)
self.settings_filter = settings

if self.settings_filter:
# Only store payloads that have equivalent (k, v) pairs
# as the settings_filter
self.lts_payloads = list(
filter(
lambda x: dict_part_eq(self.settings_filter, x.get_settings()),
lts_payloads
)
)

if not dict_part_eq(self.settings_filter, self.new_payload.get_settings()):
self.new_payload = None
logging.warning("settings_filter isn't satisfied for the new payload")
else:
self.lts_payloads = lts_payloads

# This isn't strictly necessary for all analysis techniques, but
# is useful to have
self.lts_payloads.sort(key=lambda entry: self.x_var_key(entry))


def analyze(self) -> list[dict]:

if not self.new_payload:
return [{"result": None, "kpi": None, "regression": vars(RegressionStatus(0, explanation="Not enough new data"))}]

if not self.lts_payloads:
return [{"result": None, "kpi": None, "regression": vars(RegressionStatus(0, explanation="Not enough LTS data"))}]

regression_results = []

kpis_to_test = vars(self.new_payload.results.lts.kpis).keys() if not self.kpis else self.kpis
for kpi in kpis_to_test:

curr_values = vars(self.new_payload.results.lts.kpis)[kpi].value
lts_values = list(map(lambda x: vars(x.results.kpis)[kpi].value, self.lts_payloads))

if type(vars(self.new_payload.results.lts.kpis)[kpi].value) is list:
if kpi in self.combine_funcs:
curr_values = self.combine_funcs[kpi](curr_values)
lts_values = [self.combine_funcs[kpi](v) for v in lts_values]
else:
logging.warning(f"Skipping KPI with list of values, consider filtering KPIs or providing a combine_func for {kpi}")
continue

regression_results.append(
{
"result": self.new_payload.get_settings(),
"kpi": kpi,
"regression": vars(
self.regression_test(
curr_values,
lts_values
)
)
}
)
return regression_results

def regression_test(self, new_result: float, lts_result: np.array) -> RegressionStatus:
return RegressionStatus(0, explanation="Default return status")


class ZScoreIndicator(RegressionIndicator):
"""
Example regression indicator that uses the Z score as a metric
to determine if the recent test was an outlier
"""
def __init__(self, *args, threshold=3, **kwargs):
super().__init__(*args, **kwargs)
self.threshold = threshold

def regression_test(self, new_result: float, lts_results: np.array) -> RegressionStatus:
"""
Determine if the curr_result is more/less than threshold
standard deviations away from the previous_results
"""
mean = np.mean(lts_results)
std = np.std(lts_results)
z_score = (new_result - mean) / std
if abs(z_score) > self.threshold:
return RegressionStatus(
1,
direction=1 if z_score > 0 else -1,
explanation="z-score greater than threshold",
details={"threshold": self.threshold, "zscore": z_score}
)
else:
return RegressionStatus(
0,
explanation="z-score not greater than threshold",
details={"threshold": self.threshold, "zscore": z_score}
)

class PolynomialRegressionIndicator(RegressionIndicator):
"""
Placeholder for polynomial regression that we could implement
somewhere in the pipeline
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def regression_test(self, curr_result, prev_results) -> RegressionStatus:
return RegressionStatus(0, explanation="Not implemented")

class HunterWrapperIndicator(RegressionIndicator):
"""
Some straightfoward indicators are implemented above but this also provides what should
be a simple way to wrap datastax/Hunter in a regression_test
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def regression_test(self, curr_result, prev_results) -> RegressionStatus:
return RegressionStatus(0, explanation="Not implemented")