Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vm_metrics): Enabled vm metric use for local model training #464

Merged
2 changes: 2 additions & 0 deletions src/kepler_model/cmd/cmd_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def get_pipeline(
energy_sources,
valid_feature_groups,
replace_node_type=default_node_type,
use_vm_metrics=False,
Copy link
Contributor

@sthaha sthaha Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should allow passing selectors to be selected instead of hardcoding use_vm_metrics

E.g. kepler-model train --x-selectors='job="vm"' --x-selectors='foo="bar" --y-selectors='job="metal"'
and keep a default of none, i.e single kepler instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KaiyiLiu1234 could you address this in a separate PR ?

):
from kepler_model.train import NewPipeline

Expand All @@ -367,5 +368,6 @@ def get_pipeline(
isolator=isolator,
target_energy_sources=energy_sources,
valid_feature_groups=valid_feature_groups,
use_vm_metrics=use_vm_metrics,
)
return pipeline
9 changes: 9 additions & 0 deletions src/kepler_model/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def train_from_data(args):
- --energy-source : specify target energy sources (use comma(,) as delimiter)
- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group)
- --id : specify machine ID
- --vm-train: specify whether to use vm feature and energy metrics for training - true: use vm feature metrics. Default is false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As indicated earlier, this should be replace with a way to select the feature-group metrics and a way to select the "target" metrics.

"""


Expand Down Expand Up @@ -448,6 +449,8 @@ def train(args):
if args.abs_trainers == "default":
args.abs_trainers = default_trainers

use_vm_metrics = args.vm_train

abs_trainer_names = args.abs_trainers.split(",")
dyn_trainer_names = args.dyn_trainers.split(",")

Expand All @@ -468,6 +471,7 @@ def train(args):
dyn_trainer_names,
energy_sources,
valid_feature_groups,
use_vm_metrics=use_vm_metrics,
)
machine_spec_json = load_machine_spec(data_path, machine_id)
if machine_spec_json is not None:
Expand Down Expand Up @@ -1015,6 +1019,11 @@ def run():
parser.add_argument(
"--trainers", type=str, help="Specify trainer names for train_from_data command (use comma(,) as delimiter).", default="XgboostFitTrainer"
)
parser.add_argument(
"--vm-train",
action="store_true",
help="- --vm-train: specify whether to use vm feature and energy metrics for training - true: use vm feature metrics.",
)

# Validate arguments
parser.add_argument("--benchmark", type=str, help="Specify benchmark file name.")
Expand Down
47 changes: 36 additions & 11 deletions src/kepler_model/train/extractor/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from kepler_model.util.prom_types import (
SOURCE_COL,
TIMESTAMP_COL,
VM_JOB_NAME,
container_id_cols,
process_id_cols,
energy_component_to_query,
vm_energy_component_to_query,
feature_to_query,
get_energy_unit,
node_info_column,
Expand All @@ -28,7 +31,10 @@


# append ratio for each unit
def append_ratio_for_pkg(feature_power_data, is_aggr, query_results, power_columns):
def append_ratio_for_pkg(feature_power_data, is_aggr, query_results, power_columns, use_vm_metrics=False):
cols_to_use = container_id_cols
if use_vm_metrics:
cols_to_use = process_id_cols
unit_vals = get_unit_vals(power_columns)
if len(unit_vals) == 0:
# not relate/not append
Expand All @@ -42,7 +48,7 @@ def append_ratio_for_pkg(feature_power_data, is_aggr, query_results, power_colum
if is_aggr:
ratio_df = ratio_df.groupby([TIMESTAMP_COL, pkg_id_column]).sum()[usage_ratio_query]
else:
ratio_df[container_id_colname] = ratio_df[container_id_cols].apply(lambda x: "/".join(x), axis=1)
ratio_df[container_id_colname] = ratio_df[cols_to_use].apply(lambda x: "/".join(x), axis=1)
ratio_df = ratio_df.groupby([TIMESTAMP_COL, pkg_id_column, container_id_colname]).sum()[usage_ratio_query]
ratio_colnames = []
for unit_val in unit_vals:
Expand Down Expand Up @@ -88,9 +94,9 @@ def get_name(self):
return "default"

# implement extract function
def extract(self, query_results, energy_components, feature_group, energy_source, node_level, aggr=True):
def extract(self, query_results, energy_components, feature_group, energy_source, node_level, aggr=True, use_vm_metrics=False):
# 1. compute energy different per timestamp and concat all energy component and unit
power_data = self.get_power_data(query_results, energy_components, energy_source)
power_data = self.get_power_data(query_results, energy_components, energy_source, use_vm_metrics)
if power_data is None:
return None, None, None, None
power_data = drop_zero_column(power_data, power_data.columns)
Expand All @@ -104,7 +110,7 @@ def extract(self, query_results, energy_components, feature_group, energy_source
if fg == FeatureGroup.AcceleratorOnly and node_level is not True:
return None, None, None, None
else:
feature_data, workload_features = self.get_workload_feature_data(query_results, workload_features)
feature_data, workload_features = self.get_workload_feature_data(query_results, workload_features, use_vm_metrics)

if feature_data is None:
return None, None, None, None
Expand Down Expand Up @@ -143,14 +149,18 @@ def extract(self, query_results, energy_components, feature_group, energy_source
feature_power_data = append_ratio_for_pkg(feature_power_data, is_aggr, query_results, power_columns)
return feature_power_data, power_columns, corr, workload_features

def get_workload_feature_data(self, query_results, features):
def get_workload_feature_data(self, query_results, features, use_vm_metrics=False):
feature_data = None
container_df_map = dict()
accelerator_df_list = []
cur_accelerator_features = []
feature_to_remove = []
cols_to_use = container_id_cols
if use_vm_metrics:
cols_to_use = process_id_cols

for feature in features:
query = feature_to_query(feature)
query = feature_to_query(feature, use_vm_metrics)
if query not in query_results:
print(query, "not in", list(query_results.keys()))
return None
Expand All @@ -159,9 +169,15 @@ def get_workload_feature_data(self, query_results, features):
return None
aggr_query_data = query_results[query].copy()

if all(col in aggr_query_data.columns for col in container_id_cols):
if all(col in aggr_query_data.columns for col in cols_to_use):
if use_vm_metrics:
aggr_query_data = aggr_query_data.loc[aggr_query_data["job"] == VM_JOB_NAME]
else:
aggr_query_data = aggr_query_data.loc[aggr_query_data["job"] != VM_JOB_NAME]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need this else condition. To avoid unexpected job filtering on general case, I think we can filter only when enable use_vm_metrics.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that if the filter is not added, job=vm metrics will also be pulled by the model server alongside baremetal metrics which is not a desirable result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking that the label job is a common label that could be relabeled for example if we want to label the pod with the application job. However, I understand your points that we might want to get the metric of BM also for the case that we enable the VM. After some thought, I think we can keep it as you code.

print("aggr query data feature")
print(aggr_query_data.to_string())
aggr_query_data.rename(columns={query: feature}, inplace=True)
aggr_query_data[container_id_colname] = aggr_query_data[container_id_cols].apply(lambda x: "/".join([str(xi) for xi in x]), axis=1)
aggr_query_data[container_id_colname] = aggr_query_data[cols_to_use].apply(lambda x: "/".join([str(xi) for xi in x]), axis=1)
# separate for each container_id
container_id_list = pd.unique(aggr_query_data[container_id_colname])

Expand Down Expand Up @@ -212,6 +228,7 @@ def get_workload_feature_data(self, query_results, features):
if len(feature_to_remove) != 0:
features = self.process_feature(features, feature_to_remove, cur_accelerator_features)
# return with reset index for later aggregation
#print(feature_data.reset_index().to_string())
return feature_data.reset_index(), features

def get_system_feature_data(self, query_results, features):
Expand All @@ -229,17 +246,24 @@ def get_system_feature_data(self, query_results, features):
return feature_data

# return with timestamp index
def get_power_data(self, query_results, energy_components, source):
def get_power_data(self, query_results, energy_components, source, use_vm_metrics=False):
power_data_list = []
for component in energy_components:
unit_col = get_energy_unit(component) # such as package
query = energy_component_to_query(component)
if use_vm_metrics:
query = vm_energy_component_to_query(component)
else:
query = energy_component_to_query(component)
if query not in query_results:
print(query, "not in", query_results)
return None
aggr_query_data = query_results[query].copy()
if not use_vm_metrics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please comment an explanation of this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if it is baremetal, I do not want it to use job=vm energy metric queries. We should only use baremetal metrics. Basically, I am making sure that the behavior of the extractor prior to my addition of vm metrics is the same as before (which is it will extract all the metrics with kepler_node_*). Same with in get_workload_feature_dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a single splitter to split between vm_query_results and bm_query_results from a single query_result? It could be apply for the feature extract as well.
Here I think it is fine to filter only for bm query since the vm query has vm word which is only available on bm. However, as I mentioned below, I still doubt to use vm value not the node value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will change that now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge this as is because the issue is a lot of the methods in DefaultExtractor which imo should be protected are being used directly (like get_workload_feature_data and get_power) in other functions. So I can't really create a single splitter in the DefaultExtractor.extract method. I can also close this because I am right now creating a new Extractor (DefaultVMExtractor) which will include this PR and the new Extractor. @sunya-ch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KaiyiLiu1234 Thank you for your contribution on this PR. In my opinion, creating a new DefaultVMExtractor is more preferable.

aggr_query_data = aggr_query_data.loc[aggr_query_data["job"] != VM_JOB_NAME]
# filter source
aggr_query_data = aggr_query_data[aggr_query_data[SOURCE_COL] == source]
#print("aggr query data power")
#print(aggr_query_data.to_string())
if len(aggr_query_data) == 0:
return None
if unit_col is not None:
Expand Down Expand Up @@ -287,6 +311,7 @@ def get_power_data(self, query_results, energy_components, source):
if len(power_data_list) == 0:
return None
power_data = pd.concat(power_data_list, axis=1).dropna()
#print(power_data.to_string())
return power_data

def get_system_category(self, query_results):
Expand Down
6 changes: 4 additions & 2 deletions src/kepler_model/train/extractor/smooth_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ def get_name(self):
return "smooth"

# implement extract function
def extract(self, query_results, energy_components, feature_group, energy_source, node_level, aggr=True):
feature_power_data, power_columns, _, features = super().extract(query_results, energy_components, feature_group, energy_source, node_level, aggr)
def extract(self, query_results, energy_components, feature_group, energy_source, node_level, aggr=True, use_vm_metrics=False):
feature_power_data, power_columns, _, features = super().extract(
query_results, energy_components, feature_group, energy_source, node_level, aggr, use_vm_metrics=use_vm_metrics
)

features = FeatureGroups[FeatureGroup[feature_group]]
smoothed_data = feature_power_data.copy()
Expand Down
4 changes: 3 additions & 1 deletion src/kepler_model/train/offline_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ def __init__(self, abs_trainers, dyn_trainers, idle_prom_response, isolator, iso


class TrainRequest:
def __init__(self, name, energy_source, trainer, prom_response):
def __init__(self, name, energy_source, trainer, prom_response, use_vm_metrics=False):
self.name = name
self.energy_source = energy_source
self.use_vm_metrics = use_vm_metrics
if trainer is not None:
self.trainer = TrainAttribute(**trainer)
self.prom_response = prom_response
Expand Down Expand Up @@ -92,6 +93,7 @@ def init_pipeline(self):
isolator=isolator,
target_energy_sources=[self.energy_source],
valid_feature_groups=valid_feature_groups,
use_vm_metrics=self.use_vm_metrics,
)

def get_model(self):
Expand Down
17 changes: 13 additions & 4 deletions src/kepler_model/train/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ def run_train(trainer, data, power_labels, pipeline_lock):


class Pipeline:
def __init__(self, name, trainers, extractor, isolator):
def __init__(self, name, trainers, extractor, isolator, use_vm_metrics=False):
self.extractor = extractor
self.isolator = isolator
self.trainers = trainers
self.name = name
self.use_vm_metrics = use_vm_metrics
self.lock = threading.Lock()
self.path = get_pipeline_path(model_toppath=model_toppath, pipeline_name=self.name)
self.node_collection = NodeTypeIndexCollection(self.path)
Expand All @@ -43,16 +44,21 @@ def __init__(self, name, trainers, extractor, isolator):
self.metadata["extractor"] = extractor.get_name()
self.metadata["abs_trainers"] = [trainer.__class__.__name__ for trainer in trainers if trainer.node_level]
self.metadata["dyn_trainers"] = [trainer.__class__.__name__ for trainer in trainers if not trainer.node_level]
self.metadata["metric_type"] = "vm_metrics" if self.use_vm_metrics else "bm_metrics"
self.metadata["init_time"] = time_to_str(datetime.datetime.utcnow())
for trainer in trainers:
trainer.set_node_type_index(self.node_collection.node_type_index)

def get_abs_data(self, query_results, energy_components, feature_group, energy_source, aggr):
extracted_data, power_labels, _, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr)
extracted_data, power_labels, _, _ = self.extractor.extract(
query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr, use_vm_metrics=self.use_vm_metrics
)
return extracted_data, power_labels

def get_dyn_data(self, query_results, energy_components, feature_group, energy_source):
extracted_data, power_labels, _, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=False)
extracted_data, power_labels, _, _ = self.extractor.extract(
query_results, energy_components, feature_group, energy_source, node_level=False, use_vm_metrics=self.use_vm_metrics
)
if extracted_data is None or power_labels is None:
return None
isolated_data = self.isolator.isolate(extracted_data, label_cols=power_labels, energy_source=energy_source)
Expand Down Expand Up @@ -182,6 +188,7 @@ def print_pipeline_process_end(self, energy_source, feature_group, abs_data, dyn
"Absolute Power Modeling:",
f" Input data size: {len(abs_data)}",
f" Model Trainers: {abs_trainer_names}",
" Metric Type: {}".format(self.metadata["metric_type"]),
f" Output: {abs_group_path}",
" ",
]
Expand All @@ -199,6 +206,7 @@ def print_pipeline_process_end(self, energy_source, feature_group, abs_data, dyn
f" Input data size: {len(dyn_data)}",
f" Model Trainers: {dyn_trainer_names}",
f" Output: {dyn_group_path}",
" Metric Type: {}".format(self.metadata["metric_type"]),
]
for node_type in node_types:
filtered_data = dyn_metadata_df[dyn_metadata_df[node_info_column] == node_type]
Expand Down Expand Up @@ -241,6 +249,7 @@ def NewPipeline(
isolator=MinIdleIsolator(),
target_energy_sources=PowerSourceMap.keys(),
valid_feature_groups=FeatureGroups.keys(),
use_vm_metrics=False,
KaiyiLiu1234 marked this conversation as resolved.
Show resolved Hide resolved
):
abs_trainers = initial_trainers(
abs_trainer_names, node_level=True, pipeline_name=pipeline_name, target_energy_sources=target_energy_sources, valid_feature_groups=valid_feature_groups
Expand All @@ -249,4 +258,4 @@ def NewPipeline(
dyn_trainer_names, node_level=False, pipeline_name=pipeline_name, target_energy_sources=target_energy_sources, valid_feature_groups=valid_feature_groups
)
trainers = abs_trainers + dyn_trainers
return Pipeline(pipeline_name, trainers, extractor, isolator)
return Pipeline(pipeline_name, trainers, extractor, isolator, use_vm_metrics)
14 changes: 13 additions & 1 deletion src/kepler_model/util/prom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
PROM_QUERY_STEP = get_config("PROM_QUERY_STEP", 3)

PROM_THIRDPARTY_METRICS = get_config("PROM_THIRDPARTY_METRICS", list[str]([]))
VM_JOB_NAME = get_config("VM_JOB_NAME", "vm")

metric_prefix = "kepler_"
TIMESTAMP_COL = "timestamp"
Expand All @@ -25,9 +26,13 @@

container_query_prefix = "kepler_container"
container_query_suffix = "total"
process_query_prefix = "kepler_process"
process_query_suffix = "total"

node_query_prefix = "kepler_node"
node_query_suffix = "joules_total"
vm_query_prefix = "kepler_vm"
vm_query_suffix = "joules_total"

usage_ratio_query = "kepler_container_cpu_usage_per_package_ratio"
# mostly available
Expand All @@ -36,6 +41,7 @@
cpu_frequency_info_query = "kepler_node_cpu_scaling_frequency_hertz"

container_id_cols = ["container_id", "pod_name", "container_name", "container_namespace"]
process_id_cols = ["container_id", "pid"]
node_info_column = "node_type"
pkg_id_column = "pkg_id"

Expand All @@ -46,20 +52,26 @@ def get_energy_unit(component):
return None


def feature_to_query(feature):
def feature_to_query(feature, use_process=False):
if feature in SYSTEM_FEATURES:
return f"{node_query_prefix}_{feature}"
if feature in FeatureGroups[FeatureGroup.AcceleratorOnly]:
return f"{node_query_prefix}_{feature}"
if FeatureGroup.ThirdParty in FeatureGroups is not None and feature in FeatureGroups[FeatureGroup.ThirdParty]:
return feature
if use_process:
return f"{process_query_prefix}_{feature}_{process_query_suffix}"
return f"{container_query_prefix}_{feature}_{container_query_suffix}"


def energy_component_to_query(component):
return f"{node_query_prefix}_{component}_{node_query_suffix}"


def vm_energy_component_to_query(component):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should use the the kepler_vm for training. We should use kepler_node but with job="metal" label (or job != "vm"). kepler_vm is not from the power meter but from the ratio which considering fairness distribution. In the case the we have only single vm, this might not be different but I think we better use the number from power meter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rootfs thoughts? If I were to use kepler_node, I would need to find the process id for the virtual machine. If I recall, kepler_node with vm process id and kepler_vm are the same value or very similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think vm process will be the same if the idle power is not zero since it will distribute evenly for all running process not only the vm process but I think kepler_vm and kepler_node should be the same if you have a single VM.

return f"{vm_query_prefix}_{component}_{vm_query_suffix}"


def update_thirdparty_metrics(metrics):
global FeatureGroups
FeatureGroups[FeatureGroup.ThirdParty] = metrics
Expand Down
Loading