Skip to content

Commit

Permalink
[WIP] Maxmin ratio calculations for multibottleneck
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Feb 18, 2024
1 parent 51dffda commit ded0ac5
Showing 1 changed file with 244 additions and 96 deletions.
340 changes: 244 additions & 96 deletions unfair/scripts/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,107 +559,16 @@ def parse_opened_exp(
jfi = get_jfi(flw_to_pkts, sender_fairness, flw_to_sender)

# Calculate class-based utilization numbers.
# jfi_by_bottleneck_and_sender = {}
if exp.use_bess:
# Determine a mapping from class to flows in that class.
class_to_flws = collections.defaultdict(list)
category = params["category"]
classifier = CLASSIFIERS[CATEGORIES[category][0]]

# # If the category is multibottleneck, then we need to break down time into
# # regions based on the bottleneck.
# if category == "multibottleneck":
# # Dict mapping time to a list of bottleneck events at that time.
# bottleneck_events = collections.defaultdict(dict)
# # Add all bottleneck events to unified list. Replace rates of 0
# # (no bottleneck) with the BESS bandwidth.
# for sender, bottleneck_schedule in params["sender_bottlenecks"].items():
# for bottleneck_event in bottleneck_schedule:
# rate = bottleneck_event["rate_Mbps"]
# bottleneck_events[bottleneck_event["time_s"]][sender] = (
# rate if rate != 0 else params["bess_bw_Mbps"]
# )
# # Go through the list of events and populate the rate values of any senders
# # that did not change.
# times_s = sorted(bottleneck_events.keys())
# for idx, time_s in enumerate(times_s[:-1]):
# for sender, rate in bottleneck_events[time_s].items():
# if sender not in bottleneck_events[times_s[idx + 1]]:
# bottleneck_events[times_s[idx + 1]][sender] = rate
# # Sort bottleneck events by time.
# bottleneck_events = sorted(bottleneck_events.items(), key=lambda x: x[0])

# # Create range-based bottleneck situations.
# end_time_s = max(flowset[4] for flowset in params["flowsets"])
# bottleneck_situations = []
# for idx, bottleneck in enumerate(bottleneck_events):
# time_s, configs = bottleneck
# if idx == len(bottleneck_events) - 1:
# next_time_s = end_time_s
# else:
# next_time_s = bottleneck_events[idx + 1][0]
# bottleneck_situations.append((time_s, next_time_s, configs))
# # Num bottleneck events
# num_bottlenecks_situations = len(bottleneck_situations)
# assert (
# num_bottlenecks_situations == 3
# ), f"Expected 3 bottleneck situations, but found {num_bottlenecks_situations}."

# # TODO: Consider refactoring this to be a mapping from sender to that sender's bottleneck events (the way it is in the params file).
# # TODO: Need to track this per flow.
# last_cutoff_idx = 0
# for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations:
# flw_to_pkts_during_bneck = {}
# for flw, pkts in flw_to_pkts.items():
# cutoff_idx = utils.find_bound(
# pkts[features.ARRIVAL_TIME_FET],
# bneck_end_s,
# last_cutoff_idx,
# len(pkts),
# "before",
# )
# flw_to_pkts_during_bneck[flw] = pkts[
# last_cutoff_idx : cutoff_idx + 1
# ]
# last_cutoff_idx = cutoff_idx + 1

# jfi_by_bottleneck_and_sender[(bneck_start_s, bneck_end_s)] = {
# "all": get_jfi(
# flw_to_pkts_during_bneck, sender_fairness, flw_to_sender
# )
# }

# # Plot all flows (shared bottleneck).
# plot_flows_over_time(
# exp,
# out_flp[:-4] + "_bneckbess.pdf",
# flw_to_pkts,
# flw_to_cca,
# sender_fairness,
# flw_to_sender,
# xlim=(bneck_start_s, bneck_end_s),
# bottleneck_Mbps=params["bess_bw_Mbps"],
# )
# # Plot each sender-side bottleneck separately.
# for sender, flws in sender_to_flws.items():
# plot_flows_over_time(
# exp,
# out_flp[:-4] + f"_bneck{sender}.pdf",
# {flw: flw_to_pkts[flw] for flw in flws},
# flw_to_cca,
# xlim=(bneck_start_s, bneck_end_s),
# bottleneck_Mbps=bneck_config[sender],
# )
# jfi_by_bottleneck_and_sender[(bneck_start_s, bneck_end_s)] = {
# sender: get_jfi(
# {
# flw: flw_to_pkts_during_bneck[flw]
# for flw in sender_to_flws[sender]
# },
# sender_fairness,
# flw_to_sender,
# )
# }
# If the category is multibottleneck, then we need to break down time into
# regions based on the bottleneck.
if category == "multibottleneck":
identify_bottlenecks()

for flw in params["flowsets"]:
flow_class = classifier(flw)
Expand All @@ -679,7 +588,13 @@ def parse_opened_exp(
overall_util = 0
class_to_util = {}

out = (exp, params, jfi, overall_util, class_to_util) # , jfi_by_bottleneck_and_sender)
out = (
exp,
params,
jfi,
overall_util,
class_to_util,
) # , jfi_by_bottleneck_and_sender)

# Save the results.
logging.info("\tSaving: %s", out_flp)
Expand All @@ -689,6 +604,239 @@ def parse_opened_exp(
return out


# def identify_bottlenecks(params, flw_to_pkts, sender_fairness, flw_to_sender, sender_to_flws, flw_to_cca, out_flp, exp, jfi_by_bottleneck_and_sender):
# bottleneck_to_sender_to_jfi = {}

# # Dict mapping time to a list of bottleneck events at that time.
# bottleneck_events = collections.defaultdict(dict)
# # Add all bottleneck events to unified list. Replace rates of 0
# # (no bottleneck) with the BESS bandwidth.
# for sender, bottleneck_schedule in params["sender_bottlenecks"].items():
# for bottleneck_event in bottleneck_schedule:
# rate = bottleneck_event["rate_Mbps"]
# bottleneck_events[bottleneck_event["time_s"]][sender] = (
# rate if rate != 0 else params["bess_bw_Mbps"]
# )
# # Go through the list of events and populate the rate values of any senders
# # that did not change.
# times_s = sorted(bottleneck_events.keys())
# for idx, time_s in enumerate(times_s[:-1]):
# for sender, rate in bottleneck_events[time_s].items():
# if sender not in bottleneck_events[times_s[idx + 1]]:
# bottleneck_events[times_s[idx + 1]][sender] = rate
# # Sort bottleneck events by time.
# bottleneck_events = sorted(bottleneck_events.items(), key=lambda x: x[0])

# # Create range-based bottleneck situations.
# end_time_s = max(flowset[4] for flowset in params["flowsets"])
# bottleneck_situations = []
# for idx, bottleneck in enumerate(bottleneck_events):
# time_s, configs = bottleneck
# if idx == len(bottleneck_events) - 1:
# next_time_s = end_time_s
# else:
# next_time_s = bottleneck_events[idx + 1][0]
# bottleneck_situations.append((time_s, next_time_s, configs))
# # Num bottleneck events
# num_bottlenecks_situations = len(bottleneck_situations)
# assert (
# num_bottlenecks_situations == 3
# ), f"Expected 3 bottleneck situations, but found {num_bottlenecks_situations}."

# # TODO: Consider refactoring this to be a mapping from sender to that sender's bottleneck events (the way it is in the params file).
# # TODO: Need to track this per flow.
# last_cutoff_idx = 0
# for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations:
# flw_to_pkts_during_bneck = {}
# for flw, pkts in flw_to_pkts.items():
# cutoff_idx = utils.find_bound(
# pkts[features.ARRIVAL_TIME_FET],
# bneck_end_s,
# last_cutoff_idx,
# len(pkts),
# "before",
# )
# flw_to_pkts_during_bneck[flw] = pkts[
# last_cutoff_idx : cutoff_idx + 1
# ]
# last_cutoff_idx = cutoff_idx + 1

# bottleneck_to_sender_to_jfi[(bneck_start_s, bneck_end_s)] = {
# "shared": get_jfi(
# flw_to_pkts_during_bneck, sender_fairness, flw_to_sender
# )
# }

# # Plot all flows (shared bottleneck).
# plot_flows_over_time(
# exp,
# out_flp[:-4] + "_bneckbess.pdf",
# flw_to_pkts,
# flw_to_cca,
# sender_fairness,
# flw_to_sender,
# xlim=(bneck_start_s, bneck_end_s),
# bottleneck_Mbps=params["bess_bw_Mbps"],
# )
# # Plot each sender-side bottleneck separately.
# for sender, flws in sender_to_flws.items():
# plot_flows_over_time(
# exp,
# out_flp[:-4] + f"_bneck{sender}.pdf",
# {flw: flw_to_pkts[flw] for flw in flws},
# flw_to_cca,
# xlim=(bneck_start_s, bneck_end_s),
# bottleneck_Mbps=bneck_config[sender],
# )
# bottleneck_to_sender_to_jfi[(bneck_start_s, bneck_end_s)] = {
# sender: get_jfi(
# {
# flw: flw_to_pkts_during_bneck[flw]
# for flw in sender_to_flws[sender]
# },
# sender_fairness,
# flw_to_sender,
# )
# }


def maxmin_ratios(
params,
flw_to_pkts,
sender_fairness,
flw_to_sender,
sender_to_flws,
flw_to_cca,
out_flp,
exp,
jfi_by_bottleneck_and_sender,
):
# For each bottleneck situation
# For each flow
# Determine maxmin-fair rate
# Determine actual rate
# Calculate ratio
# Average the ratios
# Return array with one average ratio per bottleneck situation

# Dict mapping time to a list of bottleneck events at that time.
bneckstart_to_sender_to_rate = collections.defaultdict(dict)
# Add all bottleneck events to unified list. Replace rates of 0
# (no bottleneck) with the BESS bandwidth.
for sender, bottleneck_schedule in params["sender_bottlenecks"].items():
for bottleneck_event in bottleneck_schedule:
rate = bottleneck_event["rate_Mbps"] * 1e6
bneckstart_to_sender_to_rate[bottleneck_event["time_s"]][sender] = (
rate if rate != 0 else float("inf")
)
# Go through the list of events and populate the rate values of any senders
# that did not change.
times_s = sorted(bneckstart_to_sender_to_rate.keys())
for idx, time_s in enumerate(times_s[:-1]):
for sender, rate in bneckstart_to_sender_to_rate[time_s].items():
if sender not in bneckstart_to_sender_to_rate[times_s[idx + 1]]:
bneckstart_to_sender_to_rate[times_s[idx + 1]][sender] = rate
# Sort bottleneck events by time.
bneckstart_to_sender_to_rate_list = sorted(
bneckstart_to_sender_to_rate.items(), key=lambda x: x[0]
)

# Create range-based bottleneck situations.
end_time_s = max(flowset[4] for flowset in params["flowsets"])
bottleneck_situations = []
for idx, bottleneck in enumerate(bneckstart_to_sender_to_rate_list):
time_s, configs = bottleneck
if idx == len(bneckstart_to_sender_to_rate_list) - 1:
next_time_s = end_time_s
else:
next_time_s = bneckstart_to_sender_to_rate_list[idx + 1][0]
bottleneck_situations.append((time_s, next_time_s, configs))
# Num bottleneck events
num_bottlenecks_situations = len(bottleneck_situations)
assert (
num_bottlenecks_situations == 3
), f"Expected 3 bottleneck situations, but found {num_bottlenecks_situations}."

# Need to determine the maxmin fair rate for each flow in each bottleneck situation
bneck_to_sender_to_maxmin_bps = {}
for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations:
bneck = (bneck_start_s, bneck_end_s)
# Fill in the maxmin fair rate contributed by the sender bottlenecks.
for sender, flws in sender_to_flws.items():
bneck_to_sender_to_maxmin_bps[(bneck_start_s, bneck_end_s)][sender] = (
bneck_config[sender] / len(flws)
)

# Calculate the maxmin fair rate including the shared bottleneck. Remember, a
# flow only has one maxmin fair rate. So we override the existing values in
# bneck_to_sender_to_maxmin_bps.

# Assume there are only two senders. Then we have three cases:
# 1) No sender bottlenecks. Maxmin fair rate is entirely determined by the
# shared bottleneck.
# 2) One sender bottleneck. Subtract from the shared bottleneck the rate of
# the sender with the bottleneck. Then divide the remainder equally
# between the other sender.
# 3) Two sender bottlenecks. Do nothing, as the maxmin rates are already set
# based on the sender bottlenecks and the shared bottleneck does not
# matter.

assert len(bneck_to_sender_to_maxmin_bps[bneck]) == 2
senders, maxmin_rates_bps = zip(*bneck_to_sender_to_maxmin_bps[bneck].items())

min_idx = np.argmin(maxmin_rates_bps)
min_sender = senders[min_idx]
min_rate_bps = maxmin_rates_bps[min_idx]

max_idx = np.argmin(maxmin_rates_bps)
max_sender = senders[max_idx]
max_rate_bps = maxmin_rates_bps[max_idx]

# Case 1 above.
if min_rate_bps == float("inf") and max_rate_bps == float("inf"):
rate_bps = (
params["bess_bw_Mbps"]
/ (len(sender_to_flws[min_sender]) + len(sender_to_flws[max_sender]))
* 1e6
)
bneck_to_sender_to_maxmin_bps[bneck][min_sender] = rate_bps
bneck_to_sender_to_maxmin_bps[bneck][max_sender] = rate_bps
# Case 2 above.
if min_rate_bps != float("inf") and max_rate_bps == float("inf"):
remainder = params["bess_bw_Mbps"] * 1e6 - bneck_config[min_sender]
bneck_to_sender_to_maxmin_bps[bneck][max_sender] = remainder / len(
sender_to_flws[max_sender]
)
# Case 3 above.
# Do nothing.

# For each bottleneck situation, compare each flow's actual throughput to its
# maxmin fair rate.
flw_to_last_cutoff_idx = collections.defaultdict(int)
bneck_to_avg_maxmin_ratio = {}
for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations:
bneck = (bneck_start_s, bneck_end_s)
flw_to_maxmin_ratio = {}
for flw, pkts in flw_to_pkts.items():
cutoff_idx = utils.find_bound(
pkts[features.ARRIVAL_TIME_FET],
bneck_end_s,
flw_to_last_cutoff_idx[flw],
len(pkts),
"before",
)
tpus_bps = utils.safe_tput_bps(
pkts, flw_to_last_cutoff_idx[flw], cutoff_idx
)
maxmin_rate_bps = bneck_to_sender_to_maxmin_bps[bneck][flw_to_sender[flw]]
flw_to_maxmin_ratio[flw] = tpus_bps / maxmin_rate_bps
flw_to_last_cutoff_idx[flw] = cutoff_idx + 1
bneck_to_avg_maxmin_ratio[bneck] = np.average(
list(flw_to_maxmin_ratio.values())
)
return bneck_to_avg_maxmin_ratio


def get_jfi(flw_to_pkts, sender_fairness=False, flw_to_sender=None):
flw_to_tput_bps = {
flw: 0 if len(pkts) == 0 else utils.safe_tput_bps(pkts, 0, len(pkts) - 1)
Expand Down

0 comments on commit ded0ac5

Please sign in to comment.