From c48f03694adcaf7a1da484e1a5cea823168cdc51 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Thu, 29 Feb 2024 19:21:46 +0000 Subject: [PATCH] sender fairness -> servicepolicy --- ratemon/model/gen_features.py | 6 +++--- ratemon/runtime/inference.py | 32 ++++++++++++++-------------- ratemon/runtime/ratemon_runtime.py | 18 ++++++++-------- ratemon/scripts/eval.py | 34 +++++++++++++++--------------- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/ratemon/model/gen_features.py b/ratemon/model/gen_features.py index 268bf98..18964ca 100755 --- a/ratemon/model/gen_features.py +++ b/ratemon/model/gen_features.py @@ -93,7 +93,7 @@ def parse_opened_exp( out_flp, skip_smoothed, select_tail_percent=None, - sender_fairness=False, + servicepolicy=False, ): """Parse an experiment. Return the smallest safe window size.""" print(f"Parsing: {exp_flp}") @@ -1314,7 +1314,7 @@ def parse_exp( out_dir, skip_smoothed, select_tail_percent, - sender_fairness=False, + servicepolicy=False, always_reparse=False, parse_func=parse_opened_exp, ): @@ -1336,7 +1336,7 @@ def parse_exp( out_flp, skip_smoothed, select_tail_percent, - sender_fairness, + servicepolicy, ) except AssertionError: traceback.print_exc() diff --git a/ratemon/runtime/inference.py b/ratemon/runtime/inference.py index ee5c1dc..bc7b186 100644 --- a/ratemon/runtime/inference.py +++ b/ratemon/runtime/inference.py @@ -100,7 +100,7 @@ def smooth(labels): return label -def make_decision_sender_fairness( +def make_decision_servicepolicy( args, flowkeys, min_rtt_us, fets, label, flow_to_decisions ): """Make a fairness decision for all flows from a sender. @@ -360,8 +360,8 @@ def make_decision( Base the decision on the flow's label and existing decision. Use the flow's features to calculate any necessary flow metrics, such as the throughput. """ - if args.sender_fairness: - new_decision = make_decision_sender_fairness( + if args.servicepolicy: + new_decision = make_decision_servicepolicy( args, flowkeys, min_rtt_us, fets, label, flow_to_decisions ) else: @@ -514,7 +514,7 @@ def parse_from_inference_queue( val, flow_to_rwnd, flow_to_decisions, flow_to_prev_features ): """Parse a message from the inference queue.""" - sender_fairness = False + servicepolicy = False epoch = num_flows_expected = None opcode, fourtuple = val[:2] flowkey = flow_utils.FlowKey(*fourtuple) @@ -528,8 +528,8 @@ def parse_from_inference_queue( win_to_loss_event_rate, ) = val[2:] - if opcode.startswith("inference-sender-fairness"): - sender_fairness = True + if opcode.startswith("inference-servicepolicy"): + servicepolicy = True epoch, num_flows_expected = opcode.split("-")[-2:] epoch = int(epoch) num_flows_expected = int(num_flows_expected) @@ -553,7 +553,7 @@ def parse_from_inference_queue( win_to_loss_event_rate, fourtuple, flowkey, - sender_fairness, + servicepolicy, epoch, num_flows_expected, ) @@ -621,7 +621,7 @@ def build_features( def wait_or_batch( net, - sender_fairness, + servicepolicy, batch, waiting_room, fourtuple, @@ -633,8 +633,8 @@ def wait_or_batch( epoch, num_flows_expected, ): - """Decide whether a flow should wait (if doing sender fairness) or be batched.""" - if not sender_fairness: + """Decide whether a flow should wait (if doing servicepolicy) or be batched.""" + if not servicepolicy: batch.append(([fourtuple], [flowkey], min_rtt_us, all_fets, in_fets)) logging.info( "Adding %d packets from flow %s to batch.", @@ -654,7 +654,7 @@ def wait_or_batch( [], ) logging.info( - "Adding %d packets from flow %s to sender fairness waiting room.", + "Adding %d packets from flow %s to servicepolicy waiting room.", len(in_fets), flowkey, ) @@ -686,7 +686,7 @@ def wait_or_batch( ) ) logging.info( - "Sender fairness waiting room for sender %s is full. " + "Servicepolicy waiting room for sender %s is full. " "Adding %d merged packets to batch.", utils.int_to_ip_str(merged_flowkeys[0].remote_addr), len(merged_in_fets), @@ -781,7 +781,7 @@ def loop_iteration( Includes: checking if the current batch is ready and running it, pulling a message from the inference queue, computing features, and deciding if the - flow should wait (sender fairness) or be batched immediately. + flow should wait (servicepolicy) or be batched immediately. """ # First, check whether we should run inference on the current batch. if maybe_run_batch( @@ -847,7 +847,7 @@ def loop_iteration( win_to_loss_event_rate, fourtuple, flowkey, - sender_fairness, + servicepolicy, epoch, num_flows_expected, ) = parse_res @@ -876,7 +876,7 @@ def loop_iteration( packets_covered_by_batch += wait_or_batch( net, - sender_fairness, + servicepolicy, batch, waiting_room, fourtuple, @@ -898,7 +898,7 @@ def loop_iteration( def inference_loop(args, flow_to_rwnd, que, inference_flags, done): """Receive packets and run inference on them.""" logging.info("Loading model: %s", args.model_file) - if args.sender_fairness: + if args.servicepolicy: net = models.MathisFairness() else: net = models.load_model(args.model_file) diff --git a/ratemon/runtime/ratemon_runtime.py b/ratemon/runtime/ratemon_runtime.py index f98eac9..da23dd6 100644 --- a/ratemon/runtime/ratemon_runtime.py +++ b/ratemon/runtime/ratemon_runtime.py @@ -189,7 +189,7 @@ def check_flows(args, longest_window, que, inference_flags): with FLOWS.lock: for fourtuple, flow in FLOWS.items(): # A fourtuple might add other fourtuples to to_check if - # args.sender_fairness is True. + # args.servicepolicy is True. if fourtuple in to_check: continue @@ -216,7 +216,7 @@ def check_flows(args, longest_window, que, inference_flags): logging.info("No incoming packets for flow %s", flow) if flow.is_ready(args.smoothing_window, longest_window): - if args.sender_fairness: + if args.servicepolicy: # Only want to add this flow if all the flows from this # sender are ready. if FLOWS.sender_okay( @@ -334,11 +334,11 @@ def check_flow(fourtuple, args, longest_window, que, inference_flags, epoch=0): flow.min_rtt_us, win_to_loss_event_rate, ) - if args.sender_fairness: + if args.servicepolicy: que.put( ( - # inference-sender-fairness--- - f"inference-sender-fairness-{epoch}-{flow.flowkey.remote_addr}-{len(FLOWS.get_flows_from_sender(flow.flowkey.remote_addr))}", + # inference-servicepolicy--- + f"inference-servicepolicy-{epoch}-{flow.flowkey.remote_addr}-{len(FLOWS.get_flows_from_sender(flow.flowkey.remote_addr))}", *info, ), block=False, @@ -530,7 +530,7 @@ def parse_args(): type=int, ) parser.add_argument( - "--sender-fairness", + "--servicepolicy", action="store_true", help=( "Combine all flows from one sender and enforce fairness between " @@ -556,8 +556,8 @@ def parse_args(): assert path.isdir(args.cgroup), f'"--cgroup={args.cgroup}" is not a directory.' assert ( - args.sender_fairness or args.model_file is not None - ), "Specify one of '--model-file' or '--sender-fairness'. " + args.servicepolicy or args.model_file is not None + ), "Specify one of '--model-file' or '--servicepolicy'. " return args @@ -566,7 +566,7 @@ def run(args): # Need to load the model to check the input features to see the longest window. in_spc = ( models.MathisFairness() - if args.sender_fairness + if args.servicepolicy else models.load_model(args.model_file) ).in_spc diff --git a/ratemon/scripts/eval.py b/ratemon/scripts/eval.py index 8f95ed4..609fa23 100755 --- a/ratemon/scripts/eval.py +++ b/ratemon/scripts/eval.py @@ -176,7 +176,7 @@ def plot_lines( ys, alpha=0.75, linestyle=( - # If this is a sender fairness graph but not the first + # If this is a servicepolicy graph but not the first # sender, or a cubic flow in a flow fairness graph... "solid" if "Service 2" in label or label == "cubic" @@ -210,7 +210,7 @@ def plot_flows_over_time( out_flp, flw_to_pkts, flw_to_cca, - sender_fairness=False, + servicepolicy=False, flw_to_sender=None, xlim=None, bottleneck_Mbps=None, @@ -257,9 +257,9 @@ def plot_flows_over_time( lines.append((throughputs, flw)) - # If sender_fairness, then graph the total throughput of each sender instead of the + # If servicepolicy, then graph the total throughput of each sender instead of the # throughput of each flow. - if sender_fairness and flw_to_sender is not None: + if servicepolicy and flw_to_sender is not None: sender_to_tputs = dict() # Accumulate the throughput of each sender. for throughputs, flw in lines: @@ -301,7 +301,7 @@ def plot_flows_over_time( else: lines = [(throughputs, flw_to_cca[flw]) for (throughputs, flw) in lines] - colors = [COLORS_MAP["blue"], COLORS_MAP["red"]] if sender_fairness else None + colors = [COLORS_MAP["blue"], COLORS_MAP["red"]] if servicepolicy else None # If we are supposed to mark the bottleneck bandwidth, then create a horizontal # line and prepend it to the lines. @@ -326,11 +326,11 @@ def plot_flows_over_time( None, exp.bw_Mbps if exp.use_bess else None, out_flp, - legendloc=("center" if sender_fairness else "upper right"), - linewidth=(1 if sender_fairness else 1), + legendloc=("center" if servicepolicy else "upper right"), + linewidth=(1 if servicepolicy else 1), colors=colors, - bbox_to_anchor=((0.5, 1.15) if sender_fairness else None), - legend_ncol=(2 if sender_fairness else 1), + bbox_to_anchor=((0.5, 1.15) if servicepolicy else None), + legend_ncol=(2 if servicepolicy else 1), figsize=(5, 2.6), ) @@ -421,7 +421,7 @@ def parse_opened_exp( out_flp, skip_smoothed, select_tail_percent, - sender_fairness, + servicepolicy, ): # skip_smoothed is not used but is kept to maintain API compatibility # with gen_features.parse_opened_exp(). @@ -518,7 +518,7 @@ def parse_opened_exp( out_flp[:-4] + "_flows.pdf", flw_to_pkts, flw_to_cca, - sender_fairness, + servicepolicy, flw_to_sender, ) # Plot each sender separately. @@ -575,7 +575,7 @@ def parse_opened_exp( out = ( exp, params, - get_jfi(flw_to_pkts, sender_fairness, flw_to_sender), + get_jfi(flw_to_pkts, servicepolicy, flw_to_sender), overall_util, class_to_util, bneck_to_maxmin_ratios, @@ -748,12 +748,12 @@ def calculate_maxmin_ratios(params, flw_to_pkts, flw_to_sender, sender_to_flws): return bneck_to_maxmin_ratios -def get_jfi(flw_to_pkts, sender_fairness=False, flw_to_sender=None): +def get_jfi(flw_to_pkts, servicepolicy=False, flw_to_sender=None): flw_to_tput_bps = { flw: 0 if len(pkts) == 0 else utils.safe_tput_bps(pkts, 0, len(pkts) - 1) for flw, pkts in flw_to_pkts.items() } - if sender_fairness: + if servicepolicy: assert flw_to_sender is not None sender_to_tput_bps = collections.defaultdict(float) for flw, tput_bps in flw_to_tput_bps.items(): @@ -891,7 +891,7 @@ def main(args): print("Logging to:", log_flp) logging.info("Evaluating experiments in: %s", args.exp_dir) - our_label = "ServicePolicy" if args.sender_fairness else "FlowPolicy" + our_label = "ServicePolicy" if args.servicepolicy else "FlowPolicy" # Find all experiments. pcaps = [ @@ -901,7 +901,7 @@ def main(args): path.join(args.out_dir, "individual_results"), False, # skip_smoothed args.select_tail_percent, - args.sender_fairness, + args.servicepolicy, True, # always_reparse parse_opened_exp, ) @@ -1664,7 +1664,7 @@ def parse_args(): type=str, ) parser.add_argument( - "--sender-fairness", + "--servicepolicy", action="store_true", help="Evaluate fairness across senders instead of across flows.", )