Skip to content

Commit

Permalink
Graphs for background flows experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Feb 6, 2024
1 parent adf5fcb commit 1ff9b03
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 50 deletions.
4 changes: 2 additions & 2 deletions unfair/model/gen_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -1425,10 +1425,10 @@ def _main():
print(f"Num files: {len(pcaps)}")
tim_srt_s = time.time()
if defaults.SYNC:
smallest_safe_wins = {parse_exp(*pcap) for pcap in pcaps}
smallest_safe_wins = [parse_exp(*pcap) for pcap in pcaps]
else:
with multiprocessing.Pool(processes=args.parallel) as pol:
smallest_safe_wins = set(pol.starmap(parse_exp, pcaps))
smallest_safe_wins = [pol.starmap(parse_exp, pcaps)]
print(f"Done parsing - time: {time.time() - tim_srt_s:.2f} seconds")

# Remove return values from experiments that were not parsed.
Expand Down
205 changes: 157 additions & 48 deletions unfair/scripts/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,18 @@ def group_and_box_plot(
)


def get_params(exp_flp):
"""Load the params JSON file for the given experiment."""
exp = utils.Exp(exp_flp)
params_flp = path.join(exp_flp, f"{exp.name}.json")
if not path.exists(params_flp):
raise FileNotFoundError(
f"Error: Cannot find params file ({params_flp}) in: {exp_flp}"
)
with open(params_flp, "r", encoding="utf-8") as fil:
return json.load(fil)


def main(args):
log_flp = path.join(args.out_dir, "output.log")
logging.basicConfig(
Expand All @@ -704,21 +716,35 @@ def main(args):

our_label = "ServicePolicy" if args.sender_fairness else "FlowPolicy"

# Determine the experiment category and make sure that all experiments are from the
# same category.
exp_flps = [
path.join(args.exp_dir, exp)
for exp in sorted(os.listdir(args.exp_dir))
if exp.endswith(".tar.gz")
]
categories = set()
for exp_flp in exp_flps:
categories.add(get_params(exp_flp)["category"])
assert len(categories) == 1
category = categories.pop()
# Extract the classifier and evaluation function for the given category.
classifier, eval_func = CATEGORIES[category]

# Find all experiments.
pcaps = [
(
path.join(args.exp_dir, exp),
exp_flp,
args.untar_dir,
path.join(args.out_dir, "individual_results"),
False, # skip_smoothed
args.select_tail_percent,
args.sender_fairness,
"start_time_s", # classifier
classifier,
True, # always_reparse
parse_opened_exp,
)
for exp in sorted(os.listdir(args.exp_dir))
if exp.endswith(".tar.gz")
for exp_flp in exp_flps
]
random.shuffle(pcaps)

Expand All @@ -743,10 +769,10 @@ def main(args):
)
else:
if defaults.SYNC:
results = {gen_features.parse_exp(*pcap) for pcap in pcaps}
results = [gen_features.parse_exp(*pcap) for pcap in pcaps]
else:
with multiprocessing.Pool(processes=args.parallel) as pol:
results = set(pol.starmap(gen_features.parse_exp, pcaps))
results = [pol.starmap(gen_features.parse_exp, pcaps)]
# Save raw JFI results from parsed experiments.
with open(data_flp, "wb") as fil:
pickle.dump(results, fil)
Expand All @@ -762,10 +788,12 @@ def main(args):
# Experiments in which the unfairmon was disabled.
disabled = {exp for exp in results if not exp.use_unfairmon}

# Match each enabled experiment with its corresponding disabled experiment and
# compute the JFI delta. matched is a dict mapping the name of the enabled
# experiment to a tuple of the form:
# ( disabled JFI, enabled JFI, difference in JFI from enabled to disabled )
# Match each enabled experiment with its corresponding disabled experiment.
# matched is a dict mapping the name of the experiment to a tuple of the form:
# ( params JSON, disabled results, enabled results )
# where the two results entries are tuples of the form returned by
# parse_opened_exp():
# ( jfi, overall util, map from class to util )
matched = {}
for enabled_exp in enabled:
# Find the corresponding experiment with the unfairmon disabled.
Expand All @@ -786,29 +814,14 @@ def main(args):
)
continue
matched[enabled_exp] = (
get_params(path.join(args.exp_dir, target_disabled_exp.name)),
results[target_disabled_exp],
results[enabled_exp],
)

# Determine the experiment category and make sure that all experiments are from the
# same category.
categories = set()
for exp in matched:
exp_flp = path.join(args.exp_dir, exp.name)
params_flp = path.join(exp_flp, f"{exp.name}.json")
if not path.exists(params_flp):
logging.info(
"Error: Cannot find params file (%s) in: %s", params_flp, exp_flp
)
return -1
with open(params_flp, "r", encoding="utf-8") as fil:
params = json.load(fil)
categories.add(params["category"])
assert len(categories) == 1
category = categories.pop()
logging.info("Matched experiments: %d", len(matched))

# Call category-specific evaluation function.
ret = EVALS[category](args, our_label, matched)
ret = eval_func(args, our_label, matched)

logging.info("Done analyzing - time: %.2f seconds", time.time() - start_time_s)
return ret
Expand All @@ -817,7 +830,7 @@ def main(args):
def eval_shared(args, our_label, matched):
"""Generate graphs for the simple shared bottleneck experiments."""
matched_results = {}
for enabled_exp, (disabled_results, enabled_results) in matched.items():
for enabled_exp, (_, disabled_results, enabled_results) in matched.items():
(
jfi_disabled,
overall_util_disabled,
Expand Down Expand Up @@ -856,21 +869,10 @@ def eval_shared(args, our_label, matched):
{exp.name: val for exp, val in matched_results.items()}, fil, indent=4
)

logging.info(
"Matched experiments: %d\n%s",
len(matched_results),
"\n\t".join(
[
f"{exp.name}: Overall util (enabled): {vals[5]:.2f} %"
for exp, vals in matched_results.items()
]
),
)

(
jfis_disabled,
jfis_enabled,
jfi_deltas,
_, # jfi_deltas,
jfi_deltas_percent,
overall_utils_disabled,
overall_utils_enabled,
Expand Down Expand Up @@ -928,7 +930,7 @@ def eval_shared(args, our_label, matched):
labels=["Original", our_label],
x_label="Total link utilization of incumbent flows (%)",
filename="incumbent_flows_util_hist.pdf",
# title='Histogram of "incumbent" flows link utilization,\nwith and without RateMon',
# title='Histogram of incumbent flows link utilization,\nwith and without RateMon',
)
plot_hist(
args,
Expand Down Expand Up @@ -988,7 +990,7 @@ def eval_shared(args, our_label, matched):
x_label="Total link utilization of incumbent flows (%)",
x_max=100,
filename="incumbent_flows_util_cdf.pdf",
# title='CDF of "incumbent" flows link utilization,\nwith and without RateMon',
# title='CDF of incumbent flows link utilization,\nwith and without RateMon',
colors=[COLORS_MAP["orange"], COLORS_MAP["red"], COLORS_MAP["blue"]],
)
plot_cdf(
Expand Down Expand Up @@ -1197,7 +1199,114 @@ def eval_multibottleneck(args, our_label, matched):


def eval_background(args, our_label, matched):
raise NotImplementedError()
"""Generate graphs for the simple background flows experiments."""
matched_results = {}
for enabled_exp, (_, disabled_results, enabled_results) in matched.items():
(
jfi_disabled,
overall_util_disabled,
class_to_util_disabled,
) = disabled_results
(
jfi_enabled,
overall_util_enabled,
class_to_util_enabled,
) = enabled_results

assert tuple(sorted(class_to_util_disabled.keys())) == ("receiver", "sink")
foreground_flows_util_disabled = class_to_util_disabled["receiver"]
background_flows_util_disabled = class_to_util_disabled["sink"]
foreground_flows_util_enabled = class_to_util_enabled["receiver"]
background_flows_util_enabled = class_to_util_disabled["sink"]

matched_results[enabled_exp] = (
jfi_disabled, # 0
jfi_enabled, # 1
jfi_enabled - jfi_disabled, # 2
(jfi_enabled - jfi_disabled) / jfi_disabled * 100, # 3
overall_util_disabled * 100, # 4
overall_util_enabled * 100, # 5
(overall_util_enabled - overall_util_disabled) * 100, # 6
foreground_flows_util_disabled * 100, # 7
foreground_flows_util_enabled * 100, # 8
(foreground_flows_util_enabled - foreground_flows_util_disabled) * 100, # 9
background_flows_util_disabled * 100, # 10
background_flows_util_enabled * 100, # 11
(background_flows_util_enabled - background_flows_util_disabled)
* 100, # 12
)
# Save JFI results.
with open(path.join(args.out_dir, "results.json"), "w", encoding="utf-8") as fil:
json.dump(
{exp.name: val for exp, val in matched_results.items()}, fil, indent=4
)

(
_, # jfis_disabled,
_, # jfis_enabled,
_, # jfi_deltas,
_, # jfi_deltas_percent,
_, # overall_utils_disabled,
_, # overall_utils_enabled,
_, # overall_util_deltas_percent,
foreground_flows_utils_disabled,
foreground_flows_utils_enabled,
_, # foreground_flows_util_deltas_percent,
background_flows_utils_disabled,
background_flows_utils_enabled,
_, # background_flows_util_deltas_percent,
) = list(zip(*matched_results.values()))

num_flows = [
(
# Foreground flows (not to "sink").
sum(
flowset[9] for flowset in params["flowsets"] if flowset[1][0] != "sink"
),
# Background flows (to "sink").
sum(
flowset[9] for flowset in params["flowsets"] if flowset[1][0] == "sink"
),
)
for _, (params, _, _) in matched.items()
]
# Expected total utilization of foreground flows.
foreground_flows_fair_shares = (
fore / (fore + back) * 100 for fore, back in num_flows
)
background_flows_fair_shares = (
back / (fore + back) * 100 for fore, back in num_flows
)

plot_cdf(
args,
lines=[
foreground_flows_fair_shares,
foreground_flows_utils_disabled,
foreground_flows_utils_enabled,
],
labels=["Perfectly Fair", "Original", our_label],
x_label="Link utilization of all foreground flows (%)",
x_max=100,
filename="foreground_flows_util_cdf.pdf",
# title='CDF of foreground flows link utilization,\nwith and without RateMon',
colors=[COLORS_MAP["orange"], COLORS_MAP["red"], COLORS_MAP["blue"]],
)
plot_cdf(
args,
lines=[
background_flows_fair_shares,
background_flows_utils_disabled,
background_flows_utils_enabled,
],
labels=["Perfectly Fair", "Original", our_label],
x_label="Link utilization of all background flows (%)",
x_max=100,
filename="background_flows_util_cdf.pdf",
# title='CDF of background flow link utilization,\nwith and without RateMon',
colors=[COLORS_MAP["orange"], COLORS_MAP["red"], COLORS_MAP["blue"]],
)
return 0


def parse_args():
Expand Down Expand Up @@ -1270,10 +1379,10 @@ def parse_args():
}
# The evaluation function is specific to the experiment category.
# "category" is configured in the original config JSON.
EVALS = {
"shared": eval_shared,
"multibottleneck": eval_multibottleneck,
"background": eval_background,
CATEGORIES = {
"shared": ("start_time_s", eval_shared),
"multibottleneck": ("sender", eval_multibottleneck),
"background": ("receiver", eval_background),
}

if __name__ == "__main__":
Expand Down

0 comments on commit 1ff9b03

Please sign in to comment.