From 2179295a15551ea14f86ca0ddbf55a703bdb6692 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Sun, 2 Jun 2024 01:57:25 +0000 Subject: [PATCH] Refactor scheduling for efficiency, drop num_to_schedule --- ratemon/runtime/c/libratemon_interp.cpp | 250 ++++++++++++------------ ratemon/runtime/c/ratemon.h | 8 - 2 files changed, 128 insertions(+), 130 deletions(-) diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index 1e8d03f..e4b546e 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -27,13 +27,14 @@ #include #include #include +#include #include #include "ratemon.h" // Protects writes only to max_active_flows, epoch_us, idle_timeout_ns, -// num_to_schedule, monitor_port_start, monitor_port_end, flow_to_rwnd_fd, -// flow_to_win_scale_fd, oldact, and setup. Reads are unprotected. +// monitor_port_start, monitor_port_end, flow_to_rwnd_fd, flow_to_win_scale_fd, +// oldact, and setup. Reads are unprotected. std::mutex lock_setup; // Whether setup has been performed. bool setup_done = false; @@ -66,8 +67,7 @@ std::unordered_map fd_to_flow; // parameter documentation. unsigned int max_active_flows = 5; unsigned int epoch_us = 10000; -unsigned int idle_timeout_ns = 1000; -unsigned int num_to_schedule = 1; +unsigned long idle_timeout_ns = 1000; unsigned short monitor_port_start = 9000; unsigned short monitor_port_end = 9999; @@ -143,14 +143,12 @@ void timer_callback(const boost::system::error_code &error) { } // Check that relevant parameters have been set. Otherwise, revert to slow // check mode. - if (max_active_flows == 0 || epoch_us == 0 || num_to_schedule == 0 || - flow_to_rwnd_fd == 0 || flow_to_last_data_time_fd == 0) { + if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0 || + flow_to_last_data_time_fd == 0) { RM_PRINTF( "ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, " - "num_to_schedule=%u, flow_to_rwnd_fd=%d, or " - "flow_to_last_data_time_fd=%d\n", - max_active_flows, epoch_us, num_to_schedule, flow_to_rwnd_fd, - flow_to_last_data_time_fd); + "flow_to_rwnd_fd=%d, or flow_to_last_data_time_fd=%d\n", + max_active_flows, epoch_us, flow_to_rwnd_fd, flow_to_last_data_time_fd); if (timer.expires_from_now(one_sec)) { RM_PRINTF("ERROR: timer unexpectedly cancelled\n"); } @@ -162,119 +160,131 @@ void timer_callback(const boost::system::error_code &error) { lock_scheduler.lock(); RM_PRINTF("INFO: performing scheduling\n"); - // 1. If there are no flows, then revert to slow check mode. - // This case is handled in 6. Make sure that all of the following code still - // works even if there are no flows. - - // 2. Remove closed flows from both queues. - // active_fds_queue: - unsigned long s = active_fds_queue.size(); - for (unsigned long i = 0; i < s; ++i) { - auto a = active_fds_queue.front(); - active_fds_queue.pop(); - if (fd_to_flow.contains(a.first)) - active_fds_queue.push(a); - else { - RM_PRINTF( - "INFO: removed FD=%d from active queue because it is not in " - "fd_to_flow\n", - a.first); - } - } - // paused_fds_queue: - s = paused_fds_queue.size(); - for (unsigned long i = 0; i < s; ++i) { - int p = paused_fds_queue.front(); - paused_fds_queue.pop(); - if (fd_to_flow.contains(p)) - paused_fds_queue.push(p); - else { - RM_PRINTF( - "INFO: removed FD=%d from paused queue because it is not in " - "fd_to_flow\n", - p); - } - } - - // 3. Idle timeout. Look through all active flows and pause any that have been - // idle for longer than idle_timeout_ns. Only do this if there are paused - // flows. - // Get the time in a form that we can compare to bpf_ktime_get_ns(). + // Temporary variable for storing the front of active_fds_queue. + std::pair a; + // Temporary variable for storing the front of paused_fds_queue. + int p; + // Size of active_fds_queue. + unsigned long s; + // Vector of active flows that we plan to pause. + std::vector to_pause; + // Current kernel time (since boot). struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); - unsigned long last_data_time_ns, idle_ns; unsigned long ktime_now_ns = ts.tv_sec * 1000000000ull + ts.tv_nsec; - if (idle_timeout_ns > 0 && !paused_fds_queue.empty()) { - s = active_fds_queue.size(); - for (unsigned long i = 0; i < s; ++i) { - auto a = active_fds_queue.front(); - active_fds_queue.pop(); - // Look up last active time. - if (bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first], - &last_data_time_ns)) { - active_fds_queue.push(a); - continue; + // For measuring idle time. + unsigned long last_data_time_ns, idle_ns; + // Current time (absolute). + boost::posix_time::ptime now = + boost::posix_time::microsec_clock::local_time(); + // New epoch time. + boost::posix_time::ptime now_plus_epoch = + now + boost::posix_time::microseconds(epoch_us); + + // Typically, active_fds_queue will be small and paused_fds_queue will be + // large. Therefore, it is alright for us to iterate through the entire + // active_fds_queue (multiple times), but we must iterate through as few + // elements of paused_fds_queue as possible. + + // 1) Perform a status check on all active flows. It is alright to iterate + // through all of active_fds_queue. + s = active_fds_queue.size(); + for (unsigned long i = 0; i < s; ++i) { + a = active_fds_queue.front(); + active_fds_queue.pop(); + // 1.1) If this flow has been closed, remove it. + if (!fd_to_flow.contains(a.first)) continue; + // 1.2) If idle timeout mode is enabled, then check if this flow is + // past its idle timeout. Skip this check if there are no paused + // flows. + if (idle_timeout_ns > 0 && !paused_fds_queue.empty()) { + // Look up this flow's last active time. + if (!bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first], + &last_data_time_ns)) { + idle_ns = ktime_now_ns - last_data_time_ns; + RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first, + idle_ns); + // If the flow has been idle for longer than the idle timeout, then + // pause it. We pause the flow *before* activating a replacement flow + // because it is by definition not sending data, so we do not risk + // causing a drop in utilization by pausing it immediately. + if (idle_ns > idle_timeout_ns) { + paused_fds_queue.push(a.first); + pause_flow(a.first); + continue; + } } - // If the flow has been active within the idle timeout, then keep it. - idle_ns = ktime_now_ns - last_data_time_ns; - RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first, - idle_ns); - if (idle_ns < (long)idle_timeout_ns * 1000) { - active_fds_queue.push(a); + } + // 1.3) If the flow has been active for longer than its epoch, then plan to + // pause it. + if (now > a.second) { + if (paused_fds_queue.empty()) { + // If there are no paused flows, then immediately reactivate this flow. + // Randomly jitter the epoch time by +/- 12.5%. + active_fds_queue.push( + {a.first, now_plus_epoch + + boost::posix_time::microseconds(jitter(epoch_us))}); + RM_PRINTF("INFO: reactivated FD=%d\n", a.first); continue; + } else { + // Plan to pause this flow. + to_pause.push_back(a.first); } - // Otherwise, pause the flow. - paused_fds_queue.push(a.first); - pause_flow(a.first); } + active_fds_queue.push(a); } - // 3. Calculate how many flows to activate. - // Start with the number of free slots. - unsigned long num_to_activate = max_active_flows - active_fds_queue.size(); - // If the next flow should be scheduled now, then activate at least - // num_to_schedule flows. - boost::posix_time::ptime now = - boost::posix_time::microsec_clock::local_time(); - if (now > active_fds_queue.front().second) { - num_to_activate = std::max(num_to_activate, (unsigned long)num_to_schedule); - } - // Finally, do not activate more flows than are paused. - num_to_activate = std::min(num_to_activate, paused_fds_queue.size()); - - // 4. Activate the prescribed number of flows. - boost::posix_time::ptime now_plus_epoch = - now + boost::posix_time::microseconds(epoch_us); + // 2) Activate flows. Now we can calculate how many flows to activate to reach + // full capacity. This value is the existing free capacity plus the number of + // flows we intend to pause. The important part here is that we only look at + // as many entries in paused_fds_queue as needed. + unsigned long num_to_activate = + max_active_flows - active_fds_queue.size() + to_pause.size(); for (unsigned long i = 0; i < num_to_activate; ++i) { - int p = paused_fds_queue.front(); - paused_fds_queue.pop(); - // Randomly jitter the epoch time by +/- 12.5%. - active_fds_queue.push({p, now_plus_epoch + boost::posix_time::microseconds( - jitter(epoch_us))}); - activate_flow(p); - } - - // 4. Pause excessive flows. - while (active_fds_queue.size() > max_active_flows) { - int a = active_fds_queue.front().first; - active_fds_queue.pop(); - if (active_fds_queue.size() < max_active_flows && - paused_fds_queue.empty()) { - // If there are fewer than the limit flows active and there are no - // waiting flows, then schedule this flow again. Randomly jitter the epoch - // time by +/- 12.5%. + // Loop until we find a paused flow that is valid (not closed). + while (!paused_fds_queue.empty()) { + p = paused_fds_queue.front(); + paused_fds_queue.pop(); + // If this flow has been closed, then skip it. + if (!fd_to_flow.contains(p)) continue; + // Randomly jitter the epoch time by +/- 12.5%. active_fds_queue.push( - {a, + {p, now_plus_epoch + boost::posix_time::microseconds(jitter(epoch_us))}); - RM_PRINTF("INFO: reactivated FD=%d\n", a); - } else { - // Pause this flow. - paused_fds_queue.push(a); - pause_flow(a); + activate_flow(p); + break; + } + } + + // 3) Pause flows. We need to recalculate the number of flows to pause because + // we may not have been able to activate as many flows as planned. Recall that + // it is alright to iterate through all of active_fds_queue. + unsigned long num_to_pause = (unsigned long)std::max( + 0L, (long)active_fds_queue.size() - (long)max_active_flows); +#ifdef RM_VERBOSE + assert(num_to_pause <= to_pause.size()); +#endif + // For each flow that we are supposed to pause, advance through + // active_fds_queue until we find it. + s = active_fds_queue.size(); + unsigned long j = 0; + for (unsigned long i = 0; i < num_to_pause; ++i) { + while (j < s) { + a = active_fds_queue.front(); + active_fds_queue.pop(); + if (a.first == to_pause[i]) { + // Pause this flow. + paused_fds_queue.push(a.first); + pause_flow(a.first); + break; + } + // Examine the next flow in active_fds_queue. + active_fds_queue.push(a); + ++j; } } - // 5. Check invariants. + // 4) Check invariants. #ifdef RM_VERBOSE // Cannot have more than the max number of active flows. assert(active_fds_queue.size() <= max_active_flows); @@ -282,7 +292,7 @@ void timer_callback(const boost::system::error_code &error) { assert(!active_fds_queue.empty() || paused_fds_queue.empty()); #endif - // 6. Calculate when the next timer should expire. + // 5) Calculate when the next timer should expire. boost::posix_time::time_duration when; if (active_fds_queue.empty()) { // If there are no flows, revert to slow check mode. @@ -291,16 +301,16 @@ void timer_callback(const boost::system::error_code &error) { } else if (idle_timeout_ns == 0) { // If we are not using idle timeout mode... when = active_fds_queue.front().second - now; - RM_PRINTF("INFO: scheduling time for next epoch end\n"); + RM_PRINTF("INFO: scheduling timer for next epoch end\n"); } else { // If we are using idle timeout mode... when = boost::posix_time::microsec( std::min((long)idle_timeout_ns, (active_fds_queue.front().second - now).total_microseconds())); - RM_PRINTF("INFO: scheduling time for next epoch end or idle timeout\n"); + RM_PRINTF("INFO: scheduling timer for next epoch end or idle timeout\n"); } - // 7. Start the next timer. + // 6) Start the next timer. if (timer.expires_from_now(when)) { RM_PRINTF("ERROR: timer unexpectedly cancelled\n"); } @@ -390,14 +400,11 @@ bool setup() { // Read environment variables with parameters. if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) return false; if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) return false; - if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_ns, + unsigned int idle_timeout_us; + if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us, true /* allow_zero */)) return false; - idle_timeout_ns *= 1000; // Convert from us to ns. - // Cannot schedule more than max_active_flows at once. - if (!read_env_uint(RM_NUM_TO_SCHEDULE_KEY, &num_to_schedule) || - num_to_schedule > max_active_flows) - return false; + idle_timeout_ns = (unsigned long)idle_timeout_us * 1000UL; unsigned int monitor_port_start_; if (!read_env_uint(RM_MONITOR_PORT_START_KEY, &monitor_port_start_) || monitor_port_start_ >= 65536) @@ -454,10 +461,9 @@ bool setup() { RM_PRINTF( "INFO: setup complete! max_active_flows=%u, epoch_us=%u, " - "idle_timeout_ns=%u, " - "num_to_schedule=%u, monitor_port_start=%u, monitor_port_end=%u\n", - max_active_flows, epoch_us, idle_timeout_ns, num_to_schedule, - monitor_port_start, monitor_port_end); + "idle_timeout_ns=%lu, monitor_port_start=%u, monitor_port_end=%u\n", + max_active_flows, epoch_us, idle_timeout_ns, monitor_port_start, + monitor_port_end); return true; } diff --git a/ratemon/runtime/c/ratemon.h b/ratemon/runtime/c/ratemon.h index 0758e92..9d42d91 100644 --- a/ratemon/runtime/c/ratemon.h +++ b/ratemon/runtime/c/ratemon.h @@ -32,14 +32,6 @@ // Duration after which an idle flow will be forcibly paused. 0 disables this // feature. #define RM_IDLE_TIMEOUT_US_KEY "RM_IDLE_TIMEOUT_US" -// Environment variable that specifies the number of flows to schedule per -// epoch. "Schedule" can mean either activate or pause. Note that setting this -// greater than 1 effectively switches scheduled RWND tuning from a strict -// timeslice mode (where each flow is active for exactly the scheduling epoch) -// to a loose mode (where flows are batched and the entire batch is active for -// at most the oldest flow's epoch, even if other flows in the batch arrived -// more recently). -#define RM_NUM_TO_SCHEDULE_KEY "RM_NUM_TO_SCHEDULE" // Environment variable that specifies the start range of REMOTE ports to manage // using scheduled RWND tuning. #define RM_MONITOR_PORT_START_KEY "RM_MONITOR_PORT_START"