Skip to content

Commit

Permalink
Refactor scheduling for efficiency, drop num_to_schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Jun 2, 2024
1 parent 7fa2a5a commit 2179295
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 130 deletions.
250 changes: 128 additions & 122 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
#include <mutex>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "ratemon.h"

// Protects writes only to max_active_flows, epoch_us, idle_timeout_ns,
// num_to_schedule, monitor_port_start, monitor_port_end, flow_to_rwnd_fd,
// flow_to_win_scale_fd, oldact, and setup. Reads are unprotected.
// monitor_port_start, monitor_port_end, flow_to_rwnd_fd, flow_to_win_scale_fd,
// oldact, and setup. Reads are unprotected.
std::mutex lock_setup;
// Whether setup has been performed.
bool setup_done = false;
Expand Down Expand Up @@ -66,8 +67,7 @@ std::unordered_map<int, struct rm_flow> fd_to_flow;
// parameter documentation.
unsigned int max_active_flows = 5;
unsigned int epoch_us = 10000;
unsigned int idle_timeout_ns = 1000;
unsigned int num_to_schedule = 1;
unsigned long idle_timeout_ns = 1000;
unsigned short monitor_port_start = 9000;
unsigned short monitor_port_end = 9999;

Expand Down Expand Up @@ -143,14 +143,12 @@ void timer_callback(const boost::system::error_code &error) {
}
// Check that relevant parameters have been set. Otherwise, revert to slow
// check mode.
if (max_active_flows == 0 || epoch_us == 0 || num_to_schedule == 0 ||
flow_to_rwnd_fd == 0 || flow_to_last_data_time_fd == 0) {
if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0 ||
flow_to_last_data_time_fd == 0) {
RM_PRINTF(
"ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, "
"num_to_schedule=%u, flow_to_rwnd_fd=%d, or "
"flow_to_last_data_time_fd=%d\n",
max_active_flows, epoch_us, num_to_schedule, flow_to_rwnd_fd,
flow_to_last_data_time_fd);
"flow_to_rwnd_fd=%d, or flow_to_last_data_time_fd=%d\n",
max_active_flows, epoch_us, flow_to_rwnd_fd, flow_to_last_data_time_fd);
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: timer unexpectedly cancelled\n");
}
Expand All @@ -162,127 +160,139 @@ void timer_callback(const boost::system::error_code &error) {
lock_scheduler.lock();
RM_PRINTF("INFO: performing scheduling\n");

// 1. If there are no flows, then revert to slow check mode.
// This case is handled in 6. Make sure that all of the following code still
// works even if there are no flows.

// 2. Remove closed flows from both queues.
// active_fds_queue:
unsigned long s = active_fds_queue.size();
for (unsigned long i = 0; i < s; ++i) {
auto a = active_fds_queue.front();
active_fds_queue.pop();
if (fd_to_flow.contains(a.first))
active_fds_queue.push(a);
else {
RM_PRINTF(
"INFO: removed FD=%d from active queue because it is not in "
"fd_to_flow\n",
a.first);
}
}
// paused_fds_queue:
s = paused_fds_queue.size();
for (unsigned long i = 0; i < s; ++i) {
int p = paused_fds_queue.front();
paused_fds_queue.pop();
if (fd_to_flow.contains(p))
paused_fds_queue.push(p);
else {
RM_PRINTF(
"INFO: removed FD=%d from paused queue because it is not in "
"fd_to_flow\n",
p);
}
}

// 3. Idle timeout. Look through all active flows and pause any that have been
// idle for longer than idle_timeout_ns. Only do this if there are paused
// flows.
// Get the time in a form that we can compare to bpf_ktime_get_ns().
// Temporary variable for storing the front of active_fds_queue.
std::pair<int, boost::posix_time::ptime> a;
// Temporary variable for storing the front of paused_fds_queue.
int p;
// Size of active_fds_queue.
unsigned long s;
// Vector of active flows that we plan to pause.
std::vector<int> to_pause;
// Current kernel time (since boot).
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
unsigned long last_data_time_ns, idle_ns;
unsigned long ktime_now_ns = ts.tv_sec * 1000000000ull + ts.tv_nsec;
if (idle_timeout_ns > 0 && !paused_fds_queue.empty()) {
s = active_fds_queue.size();
for (unsigned long i = 0; i < s; ++i) {
auto a = active_fds_queue.front();
active_fds_queue.pop();
// Look up last active time.
if (bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first],
&last_data_time_ns)) {
active_fds_queue.push(a);
continue;
// For measuring idle time.
unsigned long last_data_time_ns, idle_ns;
// Current time (absolute).
boost::posix_time::ptime now =
boost::posix_time::microsec_clock::local_time();
// New epoch time.
boost::posix_time::ptime now_plus_epoch =
now + boost::posix_time::microseconds(epoch_us);

// Typically, active_fds_queue will be small and paused_fds_queue will be
// large. Therefore, it is alright for us to iterate through the entire
// active_fds_queue (multiple times), but we must iterate through as few
// elements of paused_fds_queue as possible.

// 1) Perform a status check on all active flows. It is alright to iterate
// through all of active_fds_queue.
s = active_fds_queue.size();
for (unsigned long i = 0; i < s; ++i) {
a = active_fds_queue.front();
active_fds_queue.pop();
// 1.1) If this flow has been closed, remove it.
if (!fd_to_flow.contains(a.first)) continue;
// 1.2) If idle timeout mode is enabled, then check if this flow is
// past its idle timeout. Skip this check if there are no paused
// flows.
if (idle_timeout_ns > 0 && !paused_fds_queue.empty()) {
// Look up this flow's last active time.
if (!bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first],
&last_data_time_ns)) {
idle_ns = ktime_now_ns - last_data_time_ns;
RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first,
idle_ns);
// If the flow has been idle for longer than the idle timeout, then
// pause it. We pause the flow *before* activating a replacement flow
// because it is by definition not sending data, so we do not risk
// causing a drop in utilization by pausing it immediately.
if (idle_ns > idle_timeout_ns) {
paused_fds_queue.push(a.first);
pause_flow(a.first);
continue;
}
}
// If the flow has been active within the idle timeout, then keep it.
idle_ns = ktime_now_ns - last_data_time_ns;
RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first,
idle_ns);
if (idle_ns < (long)idle_timeout_ns * 1000) {
active_fds_queue.push(a);
}
// 1.3) If the flow has been active for longer than its epoch, then plan to
// pause it.
if (now > a.second) {
if (paused_fds_queue.empty()) {
// If there are no paused flows, then immediately reactivate this flow.
// Randomly jitter the epoch time by +/- 12.5%.
active_fds_queue.push(
{a.first, now_plus_epoch +
boost::posix_time::microseconds(jitter(epoch_us))});
RM_PRINTF("INFO: reactivated FD=%d\n", a.first);
continue;
} else {
// Plan to pause this flow.
to_pause.push_back(a.first);
}
// Otherwise, pause the flow.
paused_fds_queue.push(a.first);
pause_flow(a.first);
}
active_fds_queue.push(a);
}

// 3. Calculate how many flows to activate.
// Start with the number of free slots.
unsigned long num_to_activate = max_active_flows - active_fds_queue.size();
// If the next flow should be scheduled now, then activate at least
// num_to_schedule flows.
boost::posix_time::ptime now =
boost::posix_time::microsec_clock::local_time();
if (now > active_fds_queue.front().second) {
num_to_activate = std::max(num_to_activate, (unsigned long)num_to_schedule);
}
// Finally, do not activate more flows than are paused.
num_to_activate = std::min(num_to_activate, paused_fds_queue.size());

// 4. Activate the prescribed number of flows.
boost::posix_time::ptime now_plus_epoch =
now + boost::posix_time::microseconds(epoch_us);
// 2) Activate flows. Now we can calculate how many flows to activate to reach
// full capacity. This value is the existing free capacity plus the number of
// flows we intend to pause. The important part here is that we only look at
// as many entries in paused_fds_queue as needed.
unsigned long num_to_activate =
max_active_flows - active_fds_queue.size() + to_pause.size();
for (unsigned long i = 0; i < num_to_activate; ++i) {
int p = paused_fds_queue.front();
paused_fds_queue.pop();
// Randomly jitter the epoch time by +/- 12.5%.
active_fds_queue.push({p, now_plus_epoch + boost::posix_time::microseconds(
jitter(epoch_us))});
activate_flow(p);
}

// 4. Pause excessive flows.
while (active_fds_queue.size() > max_active_flows) {
int a = active_fds_queue.front().first;
active_fds_queue.pop();
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.empty()) {
// If there are fewer than the limit flows active and there are no
// waiting flows, then schedule this flow again. Randomly jitter the epoch
// time by +/- 12.5%.
// Loop until we find a paused flow that is valid (not closed).
while (!paused_fds_queue.empty()) {
p = paused_fds_queue.front();
paused_fds_queue.pop();
// If this flow has been closed, then skip it.
if (!fd_to_flow.contains(p)) continue;
// Randomly jitter the epoch time by +/- 12.5%.
active_fds_queue.push(
{a,
{p,
now_plus_epoch + boost::posix_time::microseconds(jitter(epoch_us))});
RM_PRINTF("INFO: reactivated FD=%d\n", a);
} else {
// Pause this flow.
paused_fds_queue.push(a);
pause_flow(a);
activate_flow(p);
break;
}
}

// 3) Pause flows. We need to recalculate the number of flows to pause because
// we may not have been able to activate as many flows as planned. Recall that
// it is alright to iterate through all of active_fds_queue.
unsigned long num_to_pause = (unsigned long)std::max(
0L, (long)active_fds_queue.size() - (long)max_active_flows);
#ifdef RM_VERBOSE
assert(num_to_pause <= to_pause.size());
#endif
// For each flow that we are supposed to pause, advance through
// active_fds_queue until we find it.
s = active_fds_queue.size();
unsigned long j = 0;
for (unsigned long i = 0; i < num_to_pause; ++i) {
while (j < s) {
a = active_fds_queue.front();
active_fds_queue.pop();
if (a.first == to_pause[i]) {
// Pause this flow.
paused_fds_queue.push(a.first);
pause_flow(a.first);
break;
}
// Examine the next flow in active_fds_queue.
active_fds_queue.push(a);
++j;
}
}

// 5. Check invariants.
// 4) Check invariants.
#ifdef RM_VERBOSE
// Cannot have more than the max number of active flows.
assert(active_fds_queue.size() <= max_active_flows);
// If there are no active flows, then there should also be no paused flows.
assert(!active_fds_queue.empty() || paused_fds_queue.empty());
#endif

// 6. Calculate when the next timer should expire.
// 5) Calculate when the next timer should expire.
boost::posix_time::time_duration when;
if (active_fds_queue.empty()) {
// If there are no flows, revert to slow check mode.
Expand All @@ -291,16 +301,16 @@ void timer_callback(const boost::system::error_code &error) {
} else if (idle_timeout_ns == 0) {
// If we are not using idle timeout mode...
when = active_fds_queue.front().second - now;
RM_PRINTF("INFO: scheduling time for next epoch end\n");
RM_PRINTF("INFO: scheduling timer for next epoch end\n");
} else {
// If we are using idle timeout mode...
when = boost::posix_time::microsec(
std::min((long)idle_timeout_ns,
(active_fds_queue.front().second - now).total_microseconds()));
RM_PRINTF("INFO: scheduling time for next epoch end or idle timeout\n");
RM_PRINTF("INFO: scheduling timer for next epoch end or idle timeout\n");
}

// 7. Start the next timer.
// 6) Start the next timer.
if (timer.expires_from_now(when)) {
RM_PRINTF("ERROR: timer unexpectedly cancelled\n");
}
Expand Down Expand Up @@ -390,14 +400,11 @@ bool setup() {
// Read environment variables with parameters.
if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) return false;
if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) return false;
if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_ns,
unsigned int idle_timeout_us;
if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us,
true /* allow_zero */))
return false;
idle_timeout_ns *= 1000; // Convert from us to ns.
// Cannot schedule more than max_active_flows at once.
if (!read_env_uint(RM_NUM_TO_SCHEDULE_KEY, &num_to_schedule) ||
num_to_schedule > max_active_flows)
return false;
idle_timeout_ns = (unsigned long)idle_timeout_us * 1000UL;
unsigned int monitor_port_start_;
if (!read_env_uint(RM_MONITOR_PORT_START_KEY, &monitor_port_start_) ||
monitor_port_start_ >= 65536)
Expand Down Expand Up @@ -454,10 +461,9 @@ bool setup() {

RM_PRINTF(
"INFO: setup complete! max_active_flows=%u, epoch_us=%u, "
"idle_timeout_ns=%u, "
"num_to_schedule=%u, monitor_port_start=%u, monitor_port_end=%u\n",
max_active_flows, epoch_us, idle_timeout_ns, num_to_schedule,
monitor_port_start, monitor_port_end);
"idle_timeout_ns=%lu, monitor_port_start=%u, monitor_port_end=%u\n",
max_active_flows, epoch_us, idle_timeout_ns, monitor_port_start,
monitor_port_end);
return true;
}

Expand Down
8 changes: 0 additions & 8 deletions ratemon/runtime/c/ratemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@
// Duration after which an idle flow will be forcibly paused. 0 disables this
// feature.
#define RM_IDLE_TIMEOUT_US_KEY "RM_IDLE_TIMEOUT_US"
// Environment variable that specifies the number of flows to schedule per
// epoch. "Schedule" can mean either activate or pause. Note that setting this
// greater than 1 effectively switches scheduled RWND tuning from a strict
// timeslice mode (where each flow is active for exactly the scheduling epoch)
// to a loose mode (where flows are batched and the entire batch is active for
// at most the oldest flow's epoch, even if other flows in the batch arrived
// more recently).
#define RM_NUM_TO_SCHEDULE_KEY "RM_NUM_TO_SCHEDULE"
// Environment variable that specifies the start range of REMOTE ports to manage
// using scheduled RWND tuning.
#define RM_MONITOR_PORT_START_KEY "RM_MONITOR_PORT_START"
Expand Down

0 comments on commit 2179295

Please sign in to comment.