diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index e4b546e..05e93af 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -67,7 +67,8 @@ std::unordered_map fd_to_flow; // parameter documentation. unsigned int max_active_flows = 5; unsigned int epoch_us = 10000; -unsigned long idle_timeout_ns = 1000; +long idle_timeout_us = 0; +unsigned long idle_timeout_ns = 0; unsigned short monitor_port_start = 9000; unsigned short monitor_port_end = 9999; @@ -201,17 +202,35 @@ void timer_callback(const boost::system::error_code &error) { // Look up this flow's last active time. if (!bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first], &last_data_time_ns)) { - idle_ns = ktime_now_ns - last_data_time_ns; - RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first, - idle_ns); - // If the flow has been idle for longer than the idle timeout, then - // pause it. We pause the flow *before* activating a replacement flow - // because it is by definition not sending data, so we do not risk - // causing a drop in utilization by pausing it immediately. - if (idle_ns > idle_timeout_ns) { - paused_fds_queue.push(a.first); - pause_flow(a.first); - continue; + // If last_data_time_ns is 0, then this flow has not yet been tracked. + if (last_data_time_ns) { + if (last_data_time_ns > ktime_now_ns) { + // This could be fine...perhaps a packet arrived since we captured + // the current time above? + RM_PRINTF( + "WARNING: FD=%d last data time (%lu ns) is more recent that " + "current time (%lu ns) by %lu ns\n", + a.first, last_data_time_ns, ktime_now_ns, + last_data_time_ns - ktime_now_ns); + } else { + idle_ns = ktime_now_ns - last_data_time_ns; + RM_PRINTF("INFO: FD=%d now: %lu ns, last data time: %lu ns\n", + a.first, ktime_now_ns, last_data_time_ns); + RM_PRINTF( + "INFO: FD=%d idle has been idle for %lu ns. timeout is %lu " + "ns\n", + a.first, idle_ns, idle_timeout_ns); + // If the flow has been idle for longer than the idle timeout, then + // pause it. We pause the flow *before* activating a replacement + // flow because it is by definition not sending data, so we do not + // risk causing a drop in utilization by pausing it immediately. + if (idle_ns >= idle_timeout_ns) { + RM_PRINTF("INFO: Pausing FD=%d due to idle timeout\n", a.first); + paused_fds_queue.push(a.first); + pause_flow(a.first); + continue; + } + } } } } @@ -304,10 +323,15 @@ void timer_callback(const boost::system::error_code &error) { RM_PRINTF("INFO: scheduling timer for next epoch end\n"); } else { // If we are using idle timeout mode... - when = boost::posix_time::microsec( - std::min((long)idle_timeout_ns, - (active_fds_queue.front().second - now).total_microseconds())); - RM_PRINTF("INFO: scheduling timer for next epoch end or idle timeout\n"); + long next_epoch_us = + (active_fds_queue.front().second - now).total_microseconds(); + if (idle_timeout_us < next_epoch_us) { + RM_PRINTF("INFO: scheduling timer for next idle timeout\n"); + when = boost::posix_time::microsec(idle_timeout_us); + } else { + RM_PRINTF("INFO: scheduling timer for next epoch end\n"); + when = boost::posix_time::microsec(next_epoch_us); + } } // 6) Start the next timer. @@ -400,10 +424,11 @@ bool setup() { // Read environment variables with parameters. if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) return false; if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) return false; - unsigned int idle_timeout_us; - if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us, + unsigned int idle_timeout_us_; + if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us_, true /* allow_zero */)) return false; + idle_timeout_us = (long)idle_timeout_us_; idle_timeout_ns = (unsigned long)idle_timeout_us * 1000UL; unsigned int monitor_port_start_; if (!read_env_uint(RM_MONITOR_PORT_START_KEY, &monitor_port_start_) || @@ -517,6 +542,10 @@ bool set_cca(int fd, const char *cca) { // Perform initial scheduling for this flow. void initial_scheduling(int fd) { + // Create an entry in flow_to_last_data_time_ns for this flow so that the + // kprobe program knows to start tracking this flow. + bpf_map_update_elem(flow_to_last_data_time_fd, &fd_to_flow[fd], &zero, + BPF_ANY); // Should this flow be active or paused? if (active_fds_queue.size() < max_active_flows) { // Less than the max number of flows are active, so make this one active. diff --git a/ratemon/runtime/c/ratemon.h b/ratemon/runtime/c/ratemon.h index 9d42d91..462739e 100644 --- a/ratemon/runtime/c/ratemon.h +++ b/ratemon/runtime/c/ratemon.h @@ -4,7 +4,7 @@ #define __RATEMON_H // Comment out the below line to disable verbose logging. -// #define RM_VERBOSE +#define RM_VERBOSE #ifdef RM_VERBOSE #define RM_PRINTF(...) printf(__VA_ARGS__) diff --git a/ratemon/runtime/c/ratemon_kprobe.bpf.c b/ratemon/runtime/c/ratemon_kprobe.bpf.c index 60f8be7..7f33176 100644 --- a/ratemon/runtime/c/ratemon_kprobe.bpf.c +++ b/ratemon/runtime/c/ratemon_kprobe.bpf.c @@ -62,8 +62,17 @@ int BPF_KPROBE(tcp_rcv_established, struct sock *sk, struct sk_buff *skb) { .remote_addr = bpf_ntohl(skc_daddr), .local_port = skc_num, .remote_port = bpf_ntohs(skc_dport)}; + // Check if we should record the last data time for this flow. + if (bpf_map_lookup_elem(&flow_to_last_data_time_ns, &flow) == NULL) { + // This flow is not in the map, so we are not supposed to track its last + // data time. + return 0; + } // Get the current time and store it for this flow. - unsigned int now_ns = bpf_ktime_get_ns(); - bpf_map_update_elem(&flow_to_last_data_time_ns, &flow, &now_ns, BPF_ANY); + unsigned long now_ns = bpf_ktime_get_ns(); + if (bpf_map_update_elem(&flow_to_last_data_time_ns, &flow, &now_ns, + BPF_ANY)) { + bpf_printk("ERROR: 'tcp_rcv_established' error updating last data time"); + } return 0; }