From fe1152a4ff29d1c50c4c1ed81008a28833d21170 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Sun, 10 Mar 2024 20:59:50 +0000 Subject: [PATCH] [WIP] Refactor RateMon for RWND project --- ratemon/model/loss_event_rate.py | 2 +- ratemon/model/models.py | 12 + ratemon/runtime/flow_utils.py | 5 +- ratemon/runtime/policies.py | 21 +- ratemon/runtime/policy_engine.py | 8 +- ratemon/runtime/ratemon_runtime.c | 436 ++++++++++++------------ ratemon/runtime/ratemon_runtime.py | 115 ++++--- ratemon/runtime/reaction_strategy.py | 4 +- test/schedules/rwnd_schedule_static.csv | 2 +- 9 files changed, 327 insertions(+), 278 deletions(-) diff --git a/ratemon/model/loss_event_rate.py b/ratemon/model/loss_event_rate.py index 2d5bb0d8..11112527 100644 --- a/ratemon/model/loss_event_rate.py +++ b/ratemon/model/loss_event_rate.py @@ -8,7 +8,7 @@ class LossTracker: def __init__(self, flow, window_sizes=[8]): - assert window_sizes, "Must specific window_sizes." + assert window_sizes, "Must specify window_sizes." self.flow = flow self.window_sizes = window_sizes self.largest_window = max(self.window_sizes) diff --git a/ratemon/model/models.py b/ratemon/model/models.py index 2658f09c..2676d240 100644 --- a/ratemon/model/models.py +++ b/ratemon/model/models.py @@ -1371,6 +1371,18 @@ def predict(self, dat_in): raise NotImplementedError("ServicePolicyModel does not support prediction.") +class VoidModel: + """An empty model. Used to maintain API compatibility.""" + + # win_size = 8 + + in_spc = () + + def predict(self, dat_in): + """Predicts the fairness of a flow.""" + raise NotImplementedError("VoidModel does not support prediction.") + + ################################################################# # Old models. Present for archival purposes only. These are not # # guaranteed to function. # diff --git a/ratemon/runtime/flow_utils.py b/ratemon/runtime/flow_utils.py index dfa3ec75..736508b8 100644 --- a/ratemon/runtime/flow_utils.py +++ b/ratemon/runtime/flow_utils.py @@ -84,7 +84,10 @@ def __init__(self, fourtuple, loss_event_windows, start_time_us): self.latest_time_sec = time.time() self.label = defaults.Class.NEAR_TARGET self.decision = (defaults.Decision.NOT_PACED, None) - self.loss_tracker = loss_event_rate.LossTracker(self, loss_event_windows) + if loss_event_windows: + self.loss_tracker = loss_event_rate.LossTracker(self, loss_event_windows) + else: + self.loss_tracker = None def __str__(self): """Create a string representation of this flow.""" diff --git a/ratemon/runtime/policies.py b/ratemon/runtime/policies.py index d1b7b8f7..8b506bfa 100644 --- a/ratemon/runtime/policies.py +++ b/ratemon/runtime/policies.py @@ -3,7 +3,7 @@ from enum import IntEnum import logging -from ratemon.model import features, utils, defaults +from ratemon.model import defaults, models, features, utils from ratemon.runtime import reaction_strategy @@ -55,6 +55,23 @@ def choices(): return [to_str(policy) for policy in POLICIES] +def get_model_for_policy(policy, model_file): + if policy == Policy.NOPOLICY: + model = models.VoidModel() + elif policy == Policy.SERVICEPOLICY: + model = models.ServicePolicyModel() + elif policy == Policy.FLOWPOLICY: + assert model_file is not None + model = models.load_model(model_file) + elif policy == Policy.STATIC_RWND: + model = models.VoidModel() + elif policy == Policy.SCHEDULED_RWND: + model = models.VoidModel() + else: + raise RuntimeError(f"Unknown policy: {to_str(policy)}") + return model + + def make_decision( policy, flowkeys, @@ -296,5 +313,5 @@ def make_decision_staticrwnd(schedule): return ( defaults.Decision.PACED, None, - reaction_strategy.get_scheduled_pacing(schedule), + reaction_strategy.get_static_rwnd(schedule), ) diff --git a/ratemon/runtime/policy_engine.py b/ratemon/runtime/policy_engine.py index 4f7e9d86..e6480695 100644 --- a/ratemon/runtime/policy_engine.py +++ b/ratemon/runtime/policy_engine.py @@ -48,11 +48,7 @@ def signal_handler(sig, frame): def main_loop(args, flow_to_rwnd, que, flags, done): """Receive packets and run evaluate the policy on them.""" logging.info("Loading model: %s", args.model_file) - net = ( - models.ServicePolicyModel() - if args.policy == Policy.SERVICEPOLICY - else models.load_model(args.model_file) - ) + net = policies.get_model_for_policy(args.policy, args.model_file) logging.info("Model features:\n\t%s", "\n\t".join(net.in_spc)) flow_to_prev_features = {} # Maps flowkey to (decision, desired throughput, corresponding RWND) @@ -378,8 +374,8 @@ def batch_eval( all_fets, smooth(flw_labels), flow_to_decisions, - args.schedule, args.reaction_strategy, + args.schedule, ) if new_decision is not None: for flowkey in flowkeys: diff --git a/ratemon/runtime/ratemon_runtime.c b/ratemon/runtime/ratemon_runtime.c index 5d3e0d2f..55b24594 100644 --- a/ratemon/runtime/ratemon_runtime.c +++ b/ratemon/runtime/ratemon_runtime.c @@ -31,255 +31,259 @@ // #define FLOW_MAX_PACKETS 1024 // Key for use in flow-based maps. -struct flow_t -{ - u32 local_addr; - u32 remote_addr; - u16 local_port; - u16 remote_port; +struct flow_t { + u32 local_addr; + u32 remote_addr; + u16 local_port; + u16 remote_port; }; -struct tcp_opt -{ - __u8 kind; - __u8 len; - __u8 data; +struct tcp_opt { + __u8 kind; + __u8 len; + __u8 data; } __attribute__((packed)); -// Read RWND limit for flow, as set by userspace. Even though the advertised window -// is only 16 bits in the TCP header, use 32 bits here because we have not taken -// window scaling into account yet. -// BPF_HASH(flow_to_rwnd, struct flow_t, u32); -BPF_TABLE_PINNED("hash", struct flow_t, u32, flow_to_rwnd, 1024, "/sys/fs/bpf/flow_to_rwnd"); +// Read RWND limit for flow, as set by userspace. Even though the advertised +// window is only 16 bits in the TCP header, use 32 bits here because we have +// not taken window scaling into account yet. BPF_HASH(flow_to_rwnd, struct +// flow_t, u32); +BPF_TABLE_PINNED("hash", struct flow_t, u32, flow_to_rwnd, 1024, + "/sys/fs/bpf/flow_to_rwnd"); // Read RWND limit for flow, as set by userspace. // BPF_HASH(flow_to_win_scale, struct flow_t, u8); -BPF_TABLE_PINNED("hash", struct flow_t, u8, flow_to_win_scale, 1024, "/sys/fs/bpf/flow_to_win_scale"); +BPF_TABLE_PINNED("hash", struct flow_t, u8, flow_to_win_scale, 1024, + "/sys/fs/bpf/flow_to_win_scale"); -// BPF_TABLE_PINNED(_table_type, _key_type, _leaf_type, _name, _max_entries, "/sys/fs/bpf/xyz"); +// BPF_TABLE_PINNED(_table_type, _key_type, _leaf_type, _name, _max_entries, +// "/sys/fs/bpf/xyz"); -// Inspired by: https://stackoverflow.com/questions/65762365/ebpf-printing-udp-payload-and-source-ip-as-hex -int do_rwnd_at_egress(struct __sk_buff *skb) -{ - if (skb == NULL) - { - return TC_ACT_OK; - } - - void *data = (void *)(long)skb->data; - void *data_end = (void *)(long)skb->data_end; - struct ethhdr *eth = data; - struct iphdr *ip; - struct tcphdr *tcp; - - // Do a sanity check to make sure that the IP header does not extend past - // the end of the packet. - if ((void *)eth + sizeof(*eth) > data_end) - { - return TC_ACT_OK; - } - - // Check that this is IP. Calculate the start of the IP header, and do a - // sanity check to make sure that the IP header does not extend past the - // end of the packet. - if (eth->h_proto != bpf_htons(ETH_P_IP)) - { - return TC_ACT_OK; - } - ip = data + sizeof(*eth); - if ((void *)ip + sizeof(*ip) > data_end) - { - return TC_ACT_OK; - } - - // Similar for TCP header. - if (ip->protocol != IPPROTO_TCP) - { - return TC_ACT_OK; - } - tcp = (void *)ip + sizeof(*ip); - if ((void *)tcp + sizeof(*tcp) > data_end) - { - return TC_ACT_OK; - } +// Inspired by: +// https://stackoverflow.com/questions/65762365/ebpf-printing-udp-payload-and-source-ip-as-hex +int do_rwnd_at_egress(struct __sk_buff *skb) { + if (skb == NULL) { + return TC_ACT_OK; + } - // Prepare the lookup key. - struct flow_t flow; - flow.local_addr = ip->saddr; - flow.remote_addr = ip->daddr; - flow.local_port = bpf_ntohs(tcp->source); - flow.remote_port = bpf_ntohs(tcp->dest); - - // Look up the RWND value for this flow. - u32 *rwnd = flow_to_rwnd.lookup(&flow); - if (rwnd == NULL) - { - // We do not know the RWND value to use for this flow. - return TC_ACT_OK; - } - if (*rwnd == 0) - { - // The RWND is configured to be 0. That does not make sense. - bpf_trace_printk("Error: Flow with local port %u, remote port %u, RWND=0D\n", flow.local_port, flow.remote_port); - return TC_ACT_OK; - } + void *data = (void *)(long)skb->data; + void *data_end = (void *)(long)skb->data_end; + struct ethhdr *eth = data; + struct iphdr *ip; + struct tcphdr *tcp; - u8 *win_scale = flow_to_win_scale.lookup(&flow); - if (win_scale == NULL) - { - // We do not know the window scale to use for this flow. - bpf_trace_printk("Error: Flow with local port %u, remote port %u, no win scale\n", flow.local_port, flow.remote_port); - return TC_ACT_OK; - } + // Do a sanity check to make sure that the IP header does not extend past + // the end of the packet. + if ((void *)eth + sizeof(*eth) > data_end) { + return TC_ACT_OK; + } - // Apply the window scale to the configured RWND value. - u16 to_set = (u16)(*rwnd >> *win_scale); - // bpf_trace_printk("Setting RWND for flow with local port %u to %u (win scale: %u)\n", flow.local_port, to_set, *win_scale); - // bpf_trace_printk("Setting RWND to %u (win scale: %u, RWND with win scale: %u)\n", *rwnd, *win_scale, to_set); + // Check that this is IP. Calculate the start of the IP header, and do a + // sanity check to make sure that the IP header does not extend past the + // end of the packet. + if (eth->h_proto != bpf_htons(ETH_P_IP)) { + return TC_ACT_OK; + } + ip = data + sizeof(*eth); + if ((void *)ip + sizeof(*ip) > data_end) { + return TC_ACT_OK; + } - // Set the RWND value in the TCP header. If the existing advertised window - // set by flow control is smaller, then use that instead so that we - // preserve flow control.. - tcp->window = min(tcp->window, bpf_htons(to_set)); - // tcp->window = bpf_htons((u16)(*rwnd >> *win_scale)); + // Similar for TCP header. + if (ip->protocol != IPPROTO_TCP) { + return TC_ACT_OK; + } + tcp = (void *)ip + sizeof(*ip); + if ((void *)tcp + sizeof(*tcp) > data_end) { + return TC_ACT_OK; + } + + // Prepare the lookup key. + struct flow_t flow; + flow.local_addr = ip->saddr; + flow.remote_addr = ip->daddr; + flow.local_port = bpf_ntohs(tcp->source); + flow.remote_port = bpf_ntohs(tcp->dest); + + // Look up the RWND value for this flow. + u32 *rwnd = flow_to_rwnd.lookup(&flow); + if (rwnd == NULL) { + // We do not know the RWND value to use for this flow. + return TC_ACT_OK; + } + if (*rwnd == 0) { + // The RWND is configured to be 0. That does not make sense. + bpf_trace_printk( + "Error: Flow with local port %u, remote port %u, RWND=0D\n", + flow.local_port, flow.remote_port); + return TC_ACT_OK; + } + u8 *win_scale = flow_to_win_scale.lookup(&flow); + if (win_scale == NULL) { + // We do not know the window scale to use for this flow. + bpf_trace_printk( + "Error: Flow with local port %u, remote port %u, no win scale\n", + flow.local_port, flow.remote_port); return TC_ACT_OK; + } + + // Apply the window scale to the configured RWND value. + u16 to_set = (u16)(*rwnd >> *win_scale); + // bpf_trace_printk("Setting RWND for flow with local port %u to %u (win + // scale: %u)\n", flow.local_port, to_set, *win_scale); + // bpf_trace_printk("Setting RWND to %u (win scale: %u, RWND with win scale: + // %u)\n", *rwnd, *win_scale, to_set); + + // Set the RWND value in the TCP header. If the existing advertised window + // set by flow control is smaller, then use that instead so that we + // preserve flow control.. + tcp->window = min(tcp->window, bpf_htons(to_set)); + // tcp->window = bpf_htons((u16)(*rwnd >> *win_scale)); + + return TC_ACT_OK; } -static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops) -{ - // Set the flag enabling the BPF_SOCK_OPS_HDR_OPT_LEN_CB and - // BPF_SOCK_OPS_WRITE_HDR_OPT_CB callbacks. - if (bpf_sock_ops_cb_flags_set(skops, - skops->bpf_sock_ops_cb_flags | - BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG)) - return SOCKOPS_ERR; - return SOCKOPS_OK; +static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops) { + // Set the flag enabling the BPF_SOCK_OPS_HDR_OPT_LEN_CB and + // BPF_SOCK_OPS_WRITE_HDR_OPT_CB callbacks. + if (bpf_sock_ops_cb_flags_set(skops, skops->bpf_sock_ops_cb_flags | + BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG)) + return SOCKOPS_ERR; + return SOCKOPS_OK; } -static inline int clear_hdr_cb_flags(struct bpf_sock_ops *skops) -{ - // Clear the flag enabling the BPF_SOCK_OPS_HDR_OPT_LEN_CB and - // BPF_SOCK_OPS_WRITE_HDR_OPT_CB callbacks. - if (bpf_sock_ops_cb_flags_set(skops, - skops->bpf_sock_ops_cb_flags & - ~BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG)) - return SOCKOPS_ERR; - return SOCKOPS_OK; +static inline int clear_hdr_cb_flags(struct bpf_sock_ops *skops) { + // Clear the flag enabling the BPF_SOCK_OPS_HDR_OPT_LEN_CB and + // BPF_SOCK_OPS_WRITE_HDR_OPT_CB callbacks. + if (bpf_sock_ops_cb_flags_set(skops, skops->bpf_sock_ops_cb_flags & + ~BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG)) + return SOCKOPS_ERR; + return SOCKOPS_OK; } -static inline int handle_hdr_opt_len(struct bpf_sock_ops *skops) -{ - // If this is a SYN or SYNACK, then trigger the BPF_SOCK_OPS_WRITE_HDR_OPT_CB callback by - // reserving three bytes (the minimim) for a TCP header option. These three bytes - // will never actually be used, but reserving space is the only way for that - // callback to be triggered. - if (((skops->skb_tcp_flags & TCPHDR_SYN) == TCPHDR_SYN) && - bpf_reserve_hdr_opt(skops, 3, 0)) - return SOCKOPS_ERR; - return SOCKOPS_OK; +static inline int handle_hdr_opt_len(struct bpf_sock_ops *skops) { + // If this is a SYN or SYNACK, then trigger the BPF_SOCK_OPS_WRITE_HDR_OPT_CB + // callback by reserving three bytes (the minimim) for a TCP header option. + // These three bytes will never actually be used, but reserving space is the + // only way for that callback to be triggered. + if (((skops->skb_tcp_flags & TCPHDR_SYN) == TCPHDR_SYN) && + bpf_reserve_hdr_opt(skops, 3, 0)) + return SOCKOPS_ERR; + return SOCKOPS_OK; } -static inline int handle_write_hdr_opt(struct bpf_sock_ops *skops) -{ - if (skops->family != AF_INET) - { - // This is not an IPv4 packet. We only support IPv4 packets because the struct - // we use as a map key stores IP addresses as 32 bits. This is purely an - // implementation detail. - bpf_trace_printk("Warning: Not using IPv4 for flow on local port %u: family=%u\n", skops->local_port, skops->family); - return SOCKOPS_OK; - } - - // Keep in mind that the window scale is set by the local host on - // _outgoing_ SYN and SYNACK packets. The handle_write_hdr_opt() sockops - // callback is only triggered for outgoing packets, so all we need to do - // is filter for SYN and SYNACK. - if ((skops->skb_tcp_flags & TCPHDR_SYN) != TCPHDR_SYN) - { - // This is not a SYN or SYNACK packet. - return SOCKOPS_OK; - } - - // This is an outgoing SYN or SYNACK packet. It should contain the window - // scale. Let's try to look it up. - - struct tcp_opt win_scale_opt = { - .kind = TCPOPT_WINDOW, - .len = 0, - .data = 0}; - int ret = bpf_load_hdr_opt(skops, &win_scale_opt, sizeof(win_scale_opt), 0); - if (ret != 3 || win_scale_opt.len != 3 || - win_scale_opt.kind != TCPOPT_WINDOW) - { - switch (ret) - { - case -ENOMSG: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: -ENOMSG\n", skops->local_port); - break; - case -EINVAL: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: -EINVAL\n", skops->local_port); - break; - case -ENOENT: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: -ENOENT\n", skops->local_port); - break; - case -ENOSPC: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: -ENOSPC\n", skops->local_port); - break; - case -EFAULT: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: -EFAULT\n", skops->local_port); - break; - case -EPERM: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: -EPERM\n", skops->local_port); - break; - default: - bpf_trace_printk("Error: Failure loading window scale option for flow on local port %u: failure code = %d\n", skops->local_port, ret); - } - return SOCKOPS_ERR; - } - +static inline int handle_write_hdr_opt(struct bpf_sock_ops *skops) { + if (skops->family != AF_INET) { + // This is not an IPv4 packet. We only support IPv4 packets because the + // struct we use as a map key stores IP addresses as 32 bits. This is purely + // an implementation detail. bpf_trace_printk( - "TCP window scale for flow %u -> %u = %u\n", - bpf_ntohl(skops->remote_port), skops->local_port, win_scale_opt.data); - - // Record this window scale for use when setting the RWND in the egress path. - struct flow_t flow = { - .local_addr = skops->local_ip4, - .remote_addr = skops->remote_ip4, - .local_port = (u16)skops->local_port, - .remote_port = (u16)bpf_ntohl(skops->remote_port)}; - // Use update() instead of insert() in case this port is being reused. - // TODO: Change to insert() once the flow cleanup code is implemented. - flow_to_win_scale.update(&flow, &win_scale_opt.data); - - // Clear the flag that enables the header option write callback. - return clear_hdr_cb_flags(skops); + "Warning: Not using IPv4 for flow on local port %u: family=%u\n", + skops->local_port, skops->family); + return SOCKOPS_OK; + } + + // Keep in mind that the window scale is set by the local host on + // _outgoing_ SYN and SYNACK packets. The handle_write_hdr_opt() sockops + // callback is only triggered for outgoing packets, so all we need to do + // is filter for SYN and SYNACK. + if ((skops->skb_tcp_flags & TCPHDR_SYN) != TCPHDR_SYN) { + // This is not a SYN or SYNACK packet. + return SOCKOPS_OK; + } + + // This is an outgoing SYN or SYNACK packet. It should contain the window + // scale. Let's try to look it up. + + struct tcp_opt win_scale_opt = {.kind = TCPOPT_WINDOW, .len = 0, .data = 0}; + int ret = bpf_load_hdr_opt(skops, &win_scale_opt, sizeof(win_scale_opt), 0); + if (ret != 3 || win_scale_opt.len != 3 || + win_scale_opt.kind != TCPOPT_WINDOW) { + switch (ret) { + case -ENOMSG: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: -ENOMSG\n", + skops->local_port); + break; + case -EINVAL: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: -EINVAL\n", + skops->local_port); + break; + case -ENOENT: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: -ENOENT\n", + skops->local_port); + break; + case -ENOSPC: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: -ENOSPC\n", + skops->local_port); + break; + case -EFAULT: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: -EFAULT\n", + skops->local_port); + break; + case -EPERM: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: -EPERM\n", + skops->local_port); + break; + default: + bpf_trace_printk( + "Error: Failure loading window scale option for flow on local port " + "%u: failure code = %d\n", + skops->local_port, ret); + } + return SOCKOPS_ERR; + } + + bpf_trace_printk("TCP window scale for flow %u -> %u = %u\n", + bpf_ntohl(skops->remote_port), skops->local_port, + win_scale_opt.data); + + // Record this window scale for use when setting the RWND in the egress path. + struct flow_t flow = {.local_addr = skops->local_ip4, + .remote_addr = skops->remote_ip4, + .local_port = (u16)skops->local_port, + .remote_port = (u16)bpf_ntohl(skops->remote_port)}; + // Use update() instead of insert() in case this port is being reused. + // TODO: Change to insert() once the flow cleanup code is implemented. + flow_to_win_scale.update(&flow, &win_scale_opt.data); + + // Clear the flag that enables the header option write callback. + return clear_hdr_cb_flags(skops); } -int read_win_scale(struct bpf_sock_ops *skops) -{ - switch (skops->op) - { +int read_win_scale(struct bpf_sock_ops *skops) { + switch (skops->op) { case BPF_SOCK_OPS_TCP_LISTEN_CB: - return set_hdr_cb_flags(skops); + return set_hdr_cb_flags(skops); case BPF_SOCK_OPS_HDR_OPT_LEN_CB: - return handle_hdr_opt_len(skops); + return handle_hdr_opt_len(skops); case BPF_SOCK_OPS_WRITE_HDR_OPT_CB: - return handle_write_hdr_opt(skops); - } - return SOCKOPS_OK; + return handle_write_hdr_opt(skops); + } + return SOCKOPS_OK; } struct bpf_iter__tcp { - __bpf_md_ptr(struct bpf_iter_meta *, meta); - __bpf_md_ptr(struct sock_common *, sk_common); - uid_t uid __aligned(8); + __bpf_md_ptr(struct bpf_iter_meta *, meta); + __bpf_md_ptr(struct sock_common *, sk_common); + uid_t uid __aligned(8); }; -BPF_ITER(tcp) -{ - struct sock_common *sk_common = ctx->sk_common; - bpf_trace_printk("Iterating\n"); - return 0; +BPF_ITER(tcp) { + struct sock_common *sk_common = ctx->sk_common; + bpf_trace_printk("Iterating\n"); + return 0; } // BPF_ITER(task) diff --git a/ratemon/runtime/ratemon_runtime.py b/ratemon/runtime/ratemon_runtime.py index 62f433c5..4af91007 100644 --- a/ratemon/runtime/ratemon_runtime.py +++ b/ratemon/runtime/ratemon_runtime.py @@ -17,7 +17,7 @@ import netifaces as ni import pcapy -from ratemon.model import features, models, utils +from ratemon.model import features, utils from ratemon.runtime import ( flow_utils, mitigation_strategy, @@ -141,6 +141,7 @@ def parse_args(): choices=policies.choices(), help="Different types of receiver policy, depending on workload.", required=True, + type=str, ) parser.add_argument( "--schedule", @@ -178,7 +179,7 @@ def parse_args(): ), "Must specify schedule file." if args.schedule is not None: assert path.isfile(args.schedule), f"File does not exist: {args.schedule}" - args.schedule = reaction_strategy.parse_pacing_schedule(args.schedule) + args.schedule = reaction_strategy.parse_static_rwnd_schedule(args.schedule) assert ( args.batch_size > 0 ), f'"--batch-size" must be greater than 0, but is: {args.batch_size}' @@ -188,8 +189,8 @@ def parse_args(): assert path.isdir(args.cgroup), f'"--cgroup={args.cgroup}" is not a directory.' assert ( - args.policy == Policy.FLOWPOLICY - ) and args.model_file is not None, ( + args.policy != Policy.FLOWPOLICY + ) or args.model_file is not None, ( f"{policies.to_str(Policy.FLOWPOLICY)} requires '--model-file'. " ) @@ -199,16 +200,14 @@ def parse_args(): def run(args): """Core logic.""" # Need to load the model to check the input features to see the longest window. - in_spc = ( - models.ServicePolicyModel() - if args.policy == Policy.SERVICEPOLICY - else models.load_model(args.model_file) - ).in_spc - + in_spc = policies.get_model_for_policy(args.policy, args.model_file).in_spc longest_window = max( - features.parse_win_metric(fet)[1] - for fet in in_spc - if "windowed" in fet and "minRtt" in fet + ( + features.parse_win_metric(fet)[1] + for fet in in_spc + if "windowed" in fet and "minRtt" in fet + ), + default=0, ) logging.info("Longest minRTT window: %d", longest_window) @@ -254,34 +253,36 @@ def run(args): ) check_thread.start() - original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + # original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) # Create the process that will run the policy engine. policy_proc = multiprocessing.Process( target=policy_engine.run, args=(args, que, flags, done) ) policy_proc.start() - signal.signal(signal.SIGINT, original_sigint_handler) + # signal.signal(signal.SIGINT, original_sigint_handler) - # Look up my IP address to use when filtering packets. - global MY_IP - MY_IP = utils.ip_str_to_int(ni.ifaddresses(args.interface)[ni.AF_INET][0]["addr"]) + # Create the thread that will sniff packets from the network interface. + sniff_thread = threading.Thread(target=pcapy_sniff_main, args=(args, done)) + sniff_thread.start() + + logging.info("Running...press Control-C to end") + print("Running...press Control-C to end") def signal_handler(sig, frame): logging.info("Main process: You pressed Ctrl+C!") done.set() + print( + "RateMon stopping. If RateMon does not stop, then it is likely blocked " + "waiting to receive a packet. Send a packet to unblock RateMon, " + "or send SIGKILL." + ) # raise RuntimeError() signal.signal(signal.SIGINT, signal_handler) - # The current thread will sniff packets. - try: - pcapy_sniff(args, done) - except KeyboardInterrupt: - logging.info("Cancelled.") - done.set() - check_thread.join() policy_proc.join() + sniff_thread.join() return 0 @@ -430,28 +431,32 @@ def check_flow(fourtuple, args, longest_window, que, flags, epoch=0): if flags[fourtuple].value == 0: flags[fourtuple].value = 1 - # Calculate packets lost and loss event rate. Do this on all packets - # because the loss event rate is based on current RTT, not minRTT, so - # just the packets we send to the policy engine will not be enough. Note - # that the loss event rate results are just for the last packet. - ( - packets_lost, - win_to_loss_event_rate, - ) = flow.loss_tracker.loss_event_rate(flow.incoming_packets) - logging.info("win_to_loss_event_rate: %s", win_to_loss_event_rate) - - # Discard all but the minimum number of packets required to calculate - # the longest window's features, and the number of packets required - # for the smoothing window. - end_time_us = flow.incoming_packets[-args.smoothing_window][4] - for idx in range(1, len(flow.incoming_packets) - args.smoothing_window): - if ( - end_time_us - flow.incoming_packets[idx][4] - < flow.min_rtt_us * longest_window - ): - break - flow.incoming_packets = flow.incoming_packets[idx - 1 :] - packets_lost = packets_lost[idx - 1 :] + if flow.loss_tracker is not None: + # Calculate packets lost and loss event rate. Do this on all packets + # because the loss event rate is based on current RTT, not minRTT, so + # just the packets we send to the policy engine will not be enough. Note + # that the loss event rate results are just for the last packet. + ( + packets_lost, + win_to_loss_event_rate, + ) = flow.loss_tracker.loss_event_rate(flow.incoming_packets) + logging.info("win_to_loss_event_rate: %s", win_to_loss_event_rate) + + # Discard all but the minimum number of packets required to calculate + # the longest window's features, and the number of packets required + # for the smoothing window. + end_time_us = flow.incoming_packets[-args.smoothing_window][4] + for idx in range(1, len(flow.incoming_packets) - args.smoothing_window): + if ( + end_time_us - flow.incoming_packets[idx][4] + < flow.min_rtt_us * longest_window + ): + break + flow.incoming_packets = flow.incoming_packets[idx - 1 :] + packets_lost = packets_lost[idx - 1 :] + else: + packets_lost = 0 + win_to_loss_event_rate = {} logging.info( "Sending to policy engine the most recent %d packets for flow: %s", @@ -494,6 +499,20 @@ def check_flow(fourtuple, args, longest_window, que, flags, epoch=0): ) +def pcapy_sniff_main(args, done): + # The current thread will sniff packets. + + # Look up my IP address to use when filtering packets. + global MY_IP + MY_IP = utils.ip_str_to_int(ni.ifaddresses(args.interface)[ni.AF_INET][0]["addr"]) + + try: + pcapy_sniff(args, done) + except KeyboardInterrupt: + logging.info("Cancelled.") + done.set() + + def pcapy_sniff(args, done): """Use pcapy to sniff packets from a specific interface.""" # Set the snapshot length to the maximum size of the Ethernet, IPv4, and TCP @@ -524,8 +543,6 @@ def pcapy_cleanup(): logging.info("Using tcpdump filter: %s", filt) pcap.setfilter(filt) - logging.info("Running...press Control-C to end") - print("Running...press Control-C to end") last_time_s = time.time() last_exit_check_s = time.time() num_packets = 0 diff --git a/ratemon/runtime/reaction_strategy.py b/ratemon/runtime/reaction_strategy.py index 6e6d1edc..7f9a3ffe 100644 --- a/ratemon/runtime/reaction_strategy.py +++ b/ratemon/runtime/reaction_strategy.py @@ -61,7 +61,7 @@ def react_down(strategy, current): return new -def parse_pacing_schedule(flp): +def parse_static_rwnd_schedule(flp): """Parse a pacing schedule file into a list. The file must be a CSV file where each line is of the form: @@ -92,7 +92,7 @@ def parse_pacing_schedule(flp): return sorted(schedule, key=lambda p: p[0]) -def get_scheduled_pacing(schedule): +def get_static_rwnd(schedule): """Extract the scheduled RWND value for the current time. The schedule is a list as described in parse_pacing_schedule(). diff --git a/test/schedules/rwnd_schedule_static.csv b/test/schedules/rwnd_schedule_static.csv index fdffc86e..ecb9e63f 100644 --- a/test/schedules/rwnd_schedule_static.csv +++ b/test/schedules/rwnd_schedule_static.csv @@ -1 +1 @@ -0,1000000 \ No newline at end of file +0,10000 \ No newline at end of file