Skip to content

Commit

Permalink
[WIP] Idle timeout, plus refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 31, 2024
1 parent 0fb57f7 commit f6b19ab
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 104 deletions.
239 changes: 136 additions & 103 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,32 +86,38 @@ inline void trigger_ack(int fd) {
&placeholder_cc_info_length);
}

// Call this when a flow should be paused. If there are waiting flows and
// available capacity, then one will be activated. Flows are paused and
// activated in round-robin order. Each flow is allowed to be active for
// at most epoch_us microseconds.
void timer_callback(const boost::system::error_code &error) {
// Call this when a flow should be paused. If there are waiting flows and
// available capacity, then one will be activated. Flows are paused and
// activated in round-robin order. Each flow is allowed to be active for
// at most epoch_us microseconds.
RM_PRINTF("INFO: in timer_callback\n");

// 0. Perform validity checks.
// If an error (such as a cancellation) triggered this callback, then abort
// immediately.
if (error) {
RM_PRINTF("ERROR: timer_callback error: %s\n", error.message().c_str());
return;
}
// If the program has been signalled to stop, then exit.
if (!run) {
RM_PRINTF("INFO: exiting\n");
return;
}

// If setup has not been performed yet, then we cannot perform scheduling.
if (!setup_done) {
if (timer.expires_from_now(one_sec))
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: cancelled timer when should not have!\n");
}

timer.async_wait(&timer_callback);
RM_PRINTF("INFO: not set up (flag)\n");
return;
}
// At this point, max_active_flows, epoch_us, num_to_schedule,
// flow_to_rwnd_fd, and flow_to_last_data_time_fd should be set.
// flow_to_rwnd_fd, and flow_to_last_data_time_fd should be set. If not,
// revert to slow check mode.
if (max_active_flows == 0 || epoch_us == 0 || idle_timeout_us ||
num_to_schedule == 0 || flow_to_rwnd_fd == 0 ||
flow_to_last_data_time_fd == 0) {
Expand All @@ -121,133 +127,155 @@ void timer_callback(const boost::system::error_code &error) {
"flow_to_last_data_time_fd=%d\n",
max_active_flows, epoch_us, idle_timeout_us, num_to_schedule,
flow_to_rwnd_fd, flow_to_last_data_time_fd);
if (timer.expires_from_now(one_sec))
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: cancelled timer when should not have!\n");

}
timer.async_wait(&timer_callback);
RM_PRINTF("INFO: not set up (params)\n");
return;
}

// It is safe to perform scheduling.
lock_scheduler.lock();
RM_PRINTF("INFO: performing scheduling\n");

// 1. If there are no flows, then revert to slow check mode.
if (active_fds_queue.empty() && paused_fds_queue.empty()) {
if (timer.expires_from_now(one_sec))
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: cancelled timer when should not have!\n");

}
timer.async_wait(&timer_callback);
lock_scheduler.unlock();
RM_PRINTF("INFO: no flows\n");
return;
}

// 2. Remove closed flows from both queues.
// active_fds_queue
for (unsigned long i = 0; i < active_fds_queue.size(); ++i) {
auto a = active_fds_queue.front();
active_fds_queue.pop();
if (fd_to_flow.contains(a.first)) active_fds_queue.push(a);
}
// paused_fds_queue
for (unsigned long i = 0; i < paused_fds_queue.size(); ++i) {
auto p = paused_fds_queue.front();
paused_fds_queue.pop();
if (fd_to_flow.contains(p)) paused_fds_queue.push(p);
}

// 3. Idle timeout. Look through all active flows and pause any that have been
// idle for longer than idle_timeout_us. Only do this if there are paused
// flows.
boost::posix_time::ptime now =
boost::posix_time::microsec_clock::local_time();
if (now < active_fds_queue.front().second) {
// The next flow should not be scheduled yet.
if (timer.expires_from_now(active_fds_queue.front().second - now))
RM_PRINTF("ERROR: cancelled timer when should not have!\n");
if (idle_timeout_us > 0 && !active_fds_queue.empty()) {
for (unsigned long i = 0; i < active_fds_queue.size(); ++i) {
auto a = active_fds_queue.front();
active_fds_queue.pop();
// Look up last active time.
unsigned long last_data_time_ns;
if (bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first],
&last_data_time_ns)) {
active_fds_queue.push(a);
continue;
}
boost::posix_time::ptime last_data_time = {
{1970, 1, 1}, {0, 0, 0, (long)last_data_time_ns}};
// If the flow has been active within the idle timeout, then keep it.
if ((now - last_data_time).total_microseconds() < (long)idle_timeout_us) {
active_fds_queue.push(a);
continue;
}
// Otherwise, pause the flow.
paused_fds_queue.push(a.first);
bpf_map_update_elem(flow_to_rwnd_fd, &fd_to_flow[a.first], &zero,
BPF_ANY);
trigger_ack(a.first);
RM_PRINTF("INFO: paused FD=%d due to idle timeout\n", a.first);
}
}

timer.async_wait(&timer_callback);
lock_scheduler.unlock();
RM_PRINTF("INFO: early by %ld us\n",
(active_fds_queue.front().second - now).total_microseconds());
return;
// 3. Calculate how many flows to activate.
// Start with the number of free slots.
unsigned long num_to_activate = max_active_flows - active_fds_queue.size();
// If the next flow should be scheduled now, then activate at least
// num_to_schedule flows.
if (now > active_fds_queue.front().second) {
num_to_activate = std::max(num_to_activate, (unsigned long)num_to_schedule);
}
// Finally, do not activate more flows than are paused.
num_to_activate = std::min(num_to_activate, paused_fds_queue.size());

// 4. Activate the prescribed number of flows.
boost::posix_time::ptime now_plus_epoch =
now + boost::posix_time::microseconds(epoch_us);
for (unsigned long i = 0; i < num_to_activate; ++i) {
int to_activate = paused_fds_queue.front();
paused_fds_queue.pop();
// Randomly jitter the epoch time by +/- 12.5%.
int rand =
std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) -
(int)std::roundl(epoch_us * 0.125);
active_fds_queue.push(
{to_activate, now_plus_epoch + boost::posix_time::microseconds(rand)});
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate]));
trigger_ack(to_activate);
RM_PRINTF("INFO: activated FD=%d\n", to_activate);
}

// Need to keep track of the number of active flows before we do any
// scheduling so that we do not accidentally pause flows that we just
// activated.
auto num_active = active_fds_queue.size();

// Schedule up to num_to_schedule flows, one at a time. But if there are not
// enough flows, then schedule the max number in either queue. One iteration
// of this loop can service one flow from each queue.
for (unsigned int i = 0;
i < std::min((long unsigned int)num_to_schedule,
std::max(active_fds_queue.size(), paused_fds_queue.size()));
++i) {
// If there are paused flows, then activate one. Loop until we find a paused
// flow that is valid, meaning that it has not been removed from fd_to_flow
// (i.e., has not been close()'ed).
while (!paused_fds_queue.empty()) {
int to_activate = paused_fds_queue.front();
paused_fds_queue.pop();
if (fd_to_flow.contains(to_activate)) {
// This flow is valid, so activate it. Record the time at which its
// epoch is over.
// Randomly jitter the activation time by +/- 12.5% of the epoch.
int rand =
std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) -
(int)std::roundl(epoch_us * 0.125);
boost::posix_time::ptime now_plus_epoch_plus_rand =
now_plus_epoch + boost::posix_time::microseconds(rand);
active_fds_queue.push({to_activate, now_plus_epoch_plus_rand});
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate]));
trigger_ack(to_activate);
RM_PRINTF("INFO: activated FD=%d\n", to_activate);
break;
}
}

// Do not pause more active flows than were active at the start of this
// epoch, otherwise we will be pausing flows that we just activated.
if (i < num_active) {
// We always need to pop the front of the active_fds_queue because it was
// that flow's timer which triggered the current scheduling event. We
// either schedule that flow again or pause it.
int to_pause = active_fds_queue.front().first;
active_fds_queue.pop();
if (fd_to_flow.contains(to_pause)) {
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.empty()) {
// If there are fewer than the limit flows active and there are no
// waiting flows, then scheduling this flow again. But first, check to
// make sure that it is still valid (see above).
int rand =
std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) -
(int)std::roundl(epoch_us * 0.125);
boost::posix_time::ptime now_plus_epoch_plus_rand =
now_plus_epoch + boost::posix_time::microseconds(rand);
active_fds_queue.push({to_pause, now_plus_epoch_plus_rand});
RM_PRINTF("INFO: reactivated FD=%d\n", to_pause);
} else {
// Pause this flow.
paused_fds_queue.push(to_pause);
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero,
BPF_ANY);
trigger_ack(to_pause);
RM_PRINTF("INFO: paused FD=%d\n", to_pause);
}
} else
RM_PRINTF("INFO: cleaned up FD=%d\n", to_pause);
// 4. Pause excessive flows.
while (active_fds_queue.size() > max_active_flows) {
int to_pause = active_fds_queue.front().first;
active_fds_queue.pop();
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.empty()) {
// If there are fewer than the limit flows active and there are no
// waiting flows, then schedule this flow again. Randomly jitter the epoch
// time by +/- 12.5%.
int rand =
std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) -
(int)std::roundl(epoch_us * 0.125);
boost::posix_time::ptime now_plus_epoch_plus_rand =
now_plus_epoch + boost::posix_time::microseconds(rand);
active_fds_queue.push({to_pause, now_plus_epoch_plus_rand});
RM_PRINTF("INFO: reactivated FD=%d\n", to_pause);
} else {
// Pause this flow.
paused_fds_queue.push(to_pause);
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero,
BPF_ANY);
trigger_ack(to_pause);
RM_PRINTF("INFO: paused FD=%d\n", to_pause);
}
}

// Invariants.
// 5. Check invariants.
// Cannot have more than the max number of active flows.
assert(active_fds_queue.size() <= max_active_flows);
// If there are no active flows, then there should also be no paused flows.
assert(!active_fds_queue.empty() || paused_fds_queue.empty());

// Schedule the next timer callback for when the next flow should be paused.
// 6. Calculate when the next timer should expire.
boost::posix_time::time_duration when;
if (active_fds_queue.empty()) {
// If we cleaned up all flows, then revert to slow check mode.
if (timer.expires_from_now(one_sec))
RM_PRINTF("ERROR: cancelled timer when should not have!\n");

timer.async_wait(&timer_callback);
lock_scheduler.unlock();
// If there are no flows, revert to slow check mode.
RM_PRINTF("INFO: no flows remaining\n");
return;
when = one_sec;
} else if (idle_timeout_us == 0) {
// If we are using idle timeout mode...
when = boost::posix_time::microsec(
std::min((long)idle_timeout_us,
(active_fds_queue.front().second - now).total_microseconds()));
} else {
// If we are not using idle timeout mode...
when = active_fds_queue.front().second - now;
}
// Trigger scheduling for the next flow.
if (timer.expires_from_now(active_fds_queue.front().second - now))
RM_PRINTF("ERROR: cancelled timer when should not have!\n");

// 7. Start the next timer.
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: cancelled timer when should not have!\n");
}
timer.async_wait(&timer_callback);
lock_scheduler.unlock();
RM_PRINTF("INFO: sleeping until next event\n");
Expand All @@ -258,8 +286,9 @@ void thread_func() {
// This function is designed to be run in a thread. It is responsible for
// managing the async timers that perform scheduling.
RM_PRINTF("INFO: scheduler thread started\n");
if (timer.expires_from_now(one_sec))
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: cancelled timer when should not have!\n");
}

timer.async_wait(&timer_callback);
RM_PRINTF("INFO: scheduler thread initial sleep\n");
Expand All @@ -280,10 +309,11 @@ void thread_func() {
lock_scheduler.unlock();
RM_PRINTF("INFO: scheduler thread ended\n");

if (run)
if (run) {
RM_PRINTF(
"ERROR: scheduled thread ended before program was signalled to "
"stop!\n");
}
}

// Catch SIGINT and trigger the scheduler thread and timer to end.
Expand Down Expand Up @@ -465,8 +495,9 @@ void initial_scheduling(int fd, struct rm_flow *flow) {
boost::posix_time::microseconds(rand)});
RM_PRINTF("INFO: allowing new flow FD=%d\n", fd);
if (active_fds_queue.size() == 1) {
if (timer.expires_from_now(active_fds_queue.front().second - now) != 1)
if (timer.expires_from_now(active_fds_queue.front().second - now) != 1) {
RM_PRINTF("ERROR: should have cancelled 1 timer!\n");
}

timer.async_wait(&timer_callback);
RM_PRINTF("INFO: first scheduling event\n");
Expand Down Expand Up @@ -497,8 +528,9 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
if (addr != NULL && addr->sa_family != AF_INET) {
RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n",
addr->sa_family);
if (addr->sa_family == AF_INET6)
if (addr->sa_family == AF_INET6) {
RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n");
}

return fd;
}
Expand Down Expand Up @@ -567,8 +599,9 @@ int close(int sockfd) {
// from scheduling.
fd_to_flow.erase(sockfd);
RM_PRINTF("INFO: removed FD=%d\n", sockfd);
} else
} else {
RM_PRINTF("INFO: ignoring 'close' for FD=%d, not in fd_to_flow\n", sockfd);
}

lock_scheduler.unlock();

Expand Down
2 changes: 1 addition & 1 deletion ratemon/runtime/c/ratemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// #define RM_PRINTK(...) bpf_printk(__VA_ARGS__)
#else
#define NDEBUG // Disable assert() calls.
#define RM_PRINTF(...) ;
#define RM_PRINTF(...)
// #define RM_PRINTK(...)
#endif

Expand Down

0 comments on commit f6b19ab

Please sign in to comment.