diff --git a/unfair/scripts/eval.py b/unfair/scripts/eval.py index 861219d..e354734 100755 --- a/unfair/scripts/eval.py +++ b/unfair/scripts/eval.py @@ -559,107 +559,16 @@ def parse_opened_exp( jfi = get_jfi(flw_to_pkts, sender_fairness, flw_to_sender) # Calculate class-based utilization numbers. - # jfi_by_bottleneck_and_sender = {} if exp.use_bess: # Determine a mapping from class to flows in that class. class_to_flws = collections.defaultdict(list) category = params["category"] classifier = CLASSIFIERS[CATEGORIES[category][0]] - # # If the category is multibottleneck, then we need to break down time into - # # regions based on the bottleneck. - # if category == "multibottleneck": - # # Dict mapping time to a list of bottleneck events at that time. - # bottleneck_events = collections.defaultdict(dict) - # # Add all bottleneck events to unified list. Replace rates of 0 - # # (no bottleneck) with the BESS bandwidth. - # for sender, bottleneck_schedule in params["sender_bottlenecks"].items(): - # for bottleneck_event in bottleneck_schedule: - # rate = bottleneck_event["rate_Mbps"] - # bottleneck_events[bottleneck_event["time_s"]][sender] = ( - # rate if rate != 0 else params["bess_bw_Mbps"] - # ) - # # Go through the list of events and populate the rate values of any senders - # # that did not change. - # times_s = sorted(bottleneck_events.keys()) - # for idx, time_s in enumerate(times_s[:-1]): - # for sender, rate in bottleneck_events[time_s].items(): - # if sender not in bottleneck_events[times_s[idx + 1]]: - # bottleneck_events[times_s[idx + 1]][sender] = rate - # # Sort bottleneck events by time. - # bottleneck_events = sorted(bottleneck_events.items(), key=lambda x: x[0]) - - # # Create range-based bottleneck situations. - # end_time_s = max(flowset[4] for flowset in params["flowsets"]) - # bottleneck_situations = [] - # for idx, bottleneck in enumerate(bottleneck_events): - # time_s, configs = bottleneck - # if idx == len(bottleneck_events) - 1: - # next_time_s = end_time_s - # else: - # next_time_s = bottleneck_events[idx + 1][0] - # bottleneck_situations.append((time_s, next_time_s, configs)) - # # Num bottleneck events - # num_bottlenecks_situations = len(bottleneck_situations) - # assert ( - # num_bottlenecks_situations == 3 - # ), f"Expected 3 bottleneck situations, but found {num_bottlenecks_situations}." - - # # TODO: Consider refactoring this to be a mapping from sender to that sender's bottleneck events (the way it is in the params file). - # # TODO: Need to track this per flow. - # last_cutoff_idx = 0 - # for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations: - # flw_to_pkts_during_bneck = {} - # for flw, pkts in flw_to_pkts.items(): - # cutoff_idx = utils.find_bound( - # pkts[features.ARRIVAL_TIME_FET], - # bneck_end_s, - # last_cutoff_idx, - # len(pkts), - # "before", - # ) - # flw_to_pkts_during_bneck[flw] = pkts[ - # last_cutoff_idx : cutoff_idx + 1 - # ] - # last_cutoff_idx = cutoff_idx + 1 - - # jfi_by_bottleneck_and_sender[(bneck_start_s, bneck_end_s)] = { - # "all": get_jfi( - # flw_to_pkts_during_bneck, sender_fairness, flw_to_sender - # ) - # } - - # # Plot all flows (shared bottleneck). - # plot_flows_over_time( - # exp, - # out_flp[:-4] + "_bneckbess.pdf", - # flw_to_pkts, - # flw_to_cca, - # sender_fairness, - # flw_to_sender, - # xlim=(bneck_start_s, bneck_end_s), - # bottleneck_Mbps=params["bess_bw_Mbps"], - # ) - # # Plot each sender-side bottleneck separately. - # for sender, flws in sender_to_flws.items(): - # plot_flows_over_time( - # exp, - # out_flp[:-4] + f"_bneck{sender}.pdf", - # {flw: flw_to_pkts[flw] for flw in flws}, - # flw_to_cca, - # xlim=(bneck_start_s, bneck_end_s), - # bottleneck_Mbps=bneck_config[sender], - # ) - # jfi_by_bottleneck_and_sender[(bneck_start_s, bneck_end_s)] = { - # sender: get_jfi( - # { - # flw: flw_to_pkts_during_bneck[flw] - # for flw in sender_to_flws[sender] - # }, - # sender_fairness, - # flw_to_sender, - # ) - # } + # If the category is multibottleneck, then we need to break down time into + # regions based on the bottleneck. + if category == "multibottleneck": + identify_bottlenecks() for flw in params["flowsets"]: flow_class = classifier(flw) @@ -679,7 +588,13 @@ def parse_opened_exp( overall_util = 0 class_to_util = {} - out = (exp, params, jfi, overall_util, class_to_util) # , jfi_by_bottleneck_and_sender) + out = ( + exp, + params, + jfi, + overall_util, + class_to_util, + ) # , jfi_by_bottleneck_and_sender) # Save the results. logging.info("\tSaving: %s", out_flp) @@ -689,6 +604,239 @@ def parse_opened_exp( return out +# def identify_bottlenecks(params, flw_to_pkts, sender_fairness, flw_to_sender, sender_to_flws, flw_to_cca, out_flp, exp, jfi_by_bottleneck_and_sender): +# bottleneck_to_sender_to_jfi = {} + +# # Dict mapping time to a list of bottleneck events at that time. +# bottleneck_events = collections.defaultdict(dict) +# # Add all bottleneck events to unified list. Replace rates of 0 +# # (no bottleneck) with the BESS bandwidth. +# for sender, bottleneck_schedule in params["sender_bottlenecks"].items(): +# for bottleneck_event in bottleneck_schedule: +# rate = bottleneck_event["rate_Mbps"] +# bottleneck_events[bottleneck_event["time_s"]][sender] = ( +# rate if rate != 0 else params["bess_bw_Mbps"] +# ) +# # Go through the list of events and populate the rate values of any senders +# # that did not change. +# times_s = sorted(bottleneck_events.keys()) +# for idx, time_s in enumerate(times_s[:-1]): +# for sender, rate in bottleneck_events[time_s].items(): +# if sender not in bottleneck_events[times_s[idx + 1]]: +# bottleneck_events[times_s[idx + 1]][sender] = rate +# # Sort bottleneck events by time. +# bottleneck_events = sorted(bottleneck_events.items(), key=lambda x: x[0]) + +# # Create range-based bottleneck situations. +# end_time_s = max(flowset[4] for flowset in params["flowsets"]) +# bottleneck_situations = [] +# for idx, bottleneck in enumerate(bottleneck_events): +# time_s, configs = bottleneck +# if idx == len(bottleneck_events) - 1: +# next_time_s = end_time_s +# else: +# next_time_s = bottleneck_events[idx + 1][0] +# bottleneck_situations.append((time_s, next_time_s, configs)) +# # Num bottleneck events +# num_bottlenecks_situations = len(bottleneck_situations) +# assert ( +# num_bottlenecks_situations == 3 +# ), f"Expected 3 bottleneck situations, but found {num_bottlenecks_situations}." + +# # TODO: Consider refactoring this to be a mapping from sender to that sender's bottleneck events (the way it is in the params file). +# # TODO: Need to track this per flow. +# last_cutoff_idx = 0 +# for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations: +# flw_to_pkts_during_bneck = {} +# for flw, pkts in flw_to_pkts.items(): +# cutoff_idx = utils.find_bound( +# pkts[features.ARRIVAL_TIME_FET], +# bneck_end_s, +# last_cutoff_idx, +# len(pkts), +# "before", +# ) +# flw_to_pkts_during_bneck[flw] = pkts[ +# last_cutoff_idx : cutoff_idx + 1 +# ] +# last_cutoff_idx = cutoff_idx + 1 + +# bottleneck_to_sender_to_jfi[(bneck_start_s, bneck_end_s)] = { +# "shared": get_jfi( +# flw_to_pkts_during_bneck, sender_fairness, flw_to_sender +# ) +# } + +# # Plot all flows (shared bottleneck). +# plot_flows_over_time( +# exp, +# out_flp[:-4] + "_bneckbess.pdf", +# flw_to_pkts, +# flw_to_cca, +# sender_fairness, +# flw_to_sender, +# xlim=(bneck_start_s, bneck_end_s), +# bottleneck_Mbps=params["bess_bw_Mbps"], +# ) +# # Plot each sender-side bottleneck separately. +# for sender, flws in sender_to_flws.items(): +# plot_flows_over_time( +# exp, +# out_flp[:-4] + f"_bneck{sender}.pdf", +# {flw: flw_to_pkts[flw] for flw in flws}, +# flw_to_cca, +# xlim=(bneck_start_s, bneck_end_s), +# bottleneck_Mbps=bneck_config[sender], +# ) +# bottleneck_to_sender_to_jfi[(bneck_start_s, bneck_end_s)] = { +# sender: get_jfi( +# { +# flw: flw_to_pkts_during_bneck[flw] +# for flw in sender_to_flws[sender] +# }, +# sender_fairness, +# flw_to_sender, +# ) +# } + + +def maxmin_ratios( + params, + flw_to_pkts, + sender_fairness, + flw_to_sender, + sender_to_flws, + flw_to_cca, + out_flp, + exp, + jfi_by_bottleneck_and_sender, +): + # For each bottleneck situation + # For each flow + # Determine maxmin-fair rate + # Determine actual rate + # Calculate ratio + # Average the ratios + # Return array with one average ratio per bottleneck situation + + # Dict mapping time to a list of bottleneck events at that time. + bneckstart_to_sender_to_rate = collections.defaultdict(dict) + # Add all bottleneck events to unified list. Replace rates of 0 + # (no bottleneck) with the BESS bandwidth. + for sender, bottleneck_schedule in params["sender_bottlenecks"].items(): + for bottleneck_event in bottleneck_schedule: + rate = bottleneck_event["rate_Mbps"] * 1e6 + bneckstart_to_sender_to_rate[bottleneck_event["time_s"]][sender] = ( + rate if rate != 0 else float("inf") + ) + # Go through the list of events and populate the rate values of any senders + # that did not change. + times_s = sorted(bneckstart_to_sender_to_rate.keys()) + for idx, time_s in enumerate(times_s[:-1]): + for sender, rate in bneckstart_to_sender_to_rate[time_s].items(): + if sender not in bneckstart_to_sender_to_rate[times_s[idx + 1]]: + bneckstart_to_sender_to_rate[times_s[idx + 1]][sender] = rate + # Sort bottleneck events by time. + bneckstart_to_sender_to_rate_list = sorted( + bneckstart_to_sender_to_rate.items(), key=lambda x: x[0] + ) + + # Create range-based bottleneck situations. + end_time_s = max(flowset[4] for flowset in params["flowsets"]) + bottleneck_situations = [] + for idx, bottleneck in enumerate(bneckstart_to_sender_to_rate_list): + time_s, configs = bottleneck + if idx == len(bneckstart_to_sender_to_rate_list) - 1: + next_time_s = end_time_s + else: + next_time_s = bneckstart_to_sender_to_rate_list[idx + 1][0] + bottleneck_situations.append((time_s, next_time_s, configs)) + # Num bottleneck events + num_bottlenecks_situations = len(bottleneck_situations) + assert ( + num_bottlenecks_situations == 3 + ), f"Expected 3 bottleneck situations, but found {num_bottlenecks_situations}." + + # Need to determine the maxmin fair rate for each flow in each bottleneck situation + bneck_to_sender_to_maxmin_bps = {} + for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations: + bneck = (bneck_start_s, bneck_end_s) + # Fill in the maxmin fair rate contributed by the sender bottlenecks. + for sender, flws in sender_to_flws.items(): + bneck_to_sender_to_maxmin_bps[(bneck_start_s, bneck_end_s)][sender] = ( + bneck_config[sender] / len(flws) + ) + + # Calculate the maxmin fair rate including the shared bottleneck. Remember, a + # flow only has one maxmin fair rate. So we override the existing values in + # bneck_to_sender_to_maxmin_bps. + + # Assume there are only two senders. Then we have three cases: + # 1) No sender bottlenecks. Maxmin fair rate is entirely determined by the + # shared bottleneck. + # 2) One sender bottleneck. Subtract from the shared bottleneck the rate of + # the sender with the bottleneck. Then divide the remainder equally + # between the other sender. + # 3) Two sender bottlenecks. Do nothing, as the maxmin rates are already set + # based on the sender bottlenecks and the shared bottleneck does not + # matter. + + assert len(bneck_to_sender_to_maxmin_bps[bneck]) == 2 + senders, maxmin_rates_bps = zip(*bneck_to_sender_to_maxmin_bps[bneck].items()) + + min_idx = np.argmin(maxmin_rates_bps) + min_sender = senders[min_idx] + min_rate_bps = maxmin_rates_bps[min_idx] + + max_idx = np.argmin(maxmin_rates_bps) + max_sender = senders[max_idx] + max_rate_bps = maxmin_rates_bps[max_idx] + + # Case 1 above. + if min_rate_bps == float("inf") and max_rate_bps == float("inf"): + rate_bps = ( + params["bess_bw_Mbps"] + / (len(sender_to_flws[min_sender]) + len(sender_to_flws[max_sender])) + * 1e6 + ) + bneck_to_sender_to_maxmin_bps[bneck][min_sender] = rate_bps + bneck_to_sender_to_maxmin_bps[bneck][max_sender] = rate_bps + # Case 2 above. + if min_rate_bps != float("inf") and max_rate_bps == float("inf"): + remainder = params["bess_bw_Mbps"] * 1e6 - bneck_config[min_sender] + bneck_to_sender_to_maxmin_bps[bneck][max_sender] = remainder / len( + sender_to_flws[max_sender] + ) + # Case 3 above. + # Do nothing. + + # For each bottleneck situation, compare each flow's actual throughput to its + # maxmin fair rate. + flw_to_last_cutoff_idx = collections.defaultdict(int) + bneck_to_avg_maxmin_ratio = {} + for bneck_start_s, bneck_end_s, bneck_config in bottleneck_situations: + bneck = (bneck_start_s, bneck_end_s) + flw_to_maxmin_ratio = {} + for flw, pkts in flw_to_pkts.items(): + cutoff_idx = utils.find_bound( + pkts[features.ARRIVAL_TIME_FET], + bneck_end_s, + flw_to_last_cutoff_idx[flw], + len(pkts), + "before", + ) + tpus_bps = utils.safe_tput_bps( + pkts, flw_to_last_cutoff_idx[flw], cutoff_idx + ) + maxmin_rate_bps = bneck_to_sender_to_maxmin_bps[bneck][flw_to_sender[flw]] + flw_to_maxmin_ratio[flw] = tpus_bps / maxmin_rate_bps + flw_to_last_cutoff_idx[flw] = cutoff_idx + 1 + bneck_to_avg_maxmin_ratio[bneck] = np.average( + list(flw_to_maxmin_ratio.values()) + ) + return bneck_to_avg_maxmin_ratio + + def get_jfi(flw_to_pkts, sender_fairness=False, flw_to_sender=None): flw_to_tput_bps = { flw: 0 if len(pkts) == 0 else utils.safe_tput_bps(pkts, 0, len(pkts) - 1)