diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index 99695dd..b0cdadb 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -86,32 +86,38 @@ inline void trigger_ack(int fd) { &placeholder_cc_info_length); } +// Call this when a flow should be paused. If there are waiting flows and +// available capacity, then one will be activated. Flows are paused and +// activated in round-robin order. Each flow is allowed to be active for +// at most epoch_us microseconds. void timer_callback(const boost::system::error_code &error) { - // Call this when a flow should be paused. If there are waiting flows and - // available capacity, then one will be activated. Flows are paused and - // activated in round-robin order. Each flow is allowed to be active for - // at most epoch_us microseconds. RM_PRINTF("INFO: in timer_callback\n"); + + // 0. Perform validity checks. + // If an error (such as a cancellation) triggered this callback, then abort + // immediately. if (error) { RM_PRINTF("ERROR: timer_callback error: %s\n", error.message().c_str()); return; } + // If the program has been signalled to stop, then exit. if (!run) { RM_PRINTF("INFO: exiting\n"); return; } - // If setup has not been performed yet, then we cannot perform scheduling. if (!setup_done) { - if (timer.expires_from_now(one_sec)) + if (timer.expires_from_now(one_sec)) { RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + } timer.async_wait(&timer_callback); RM_PRINTF("INFO: not set up (flag)\n"); return; } // At this point, max_active_flows, epoch_us, num_to_schedule, - // flow_to_rwnd_fd, and flow_to_last_data_time_fd should be set. + // flow_to_rwnd_fd, and flow_to_last_data_time_fd should be set. If not, + // revert to slow check mode. if (max_active_flows == 0 || epoch_us == 0 || idle_timeout_us || num_to_schedule == 0 || flow_to_rwnd_fd == 0 || flow_to_last_data_time_fd == 0) { @@ -121,133 +127,155 @@ void timer_callback(const boost::system::error_code &error) { "flow_to_last_data_time_fd=%d\n", max_active_flows, epoch_us, idle_timeout_us, num_to_schedule, flow_to_rwnd_fd, flow_to_last_data_time_fd); - if (timer.expires_from_now(one_sec)) + if (timer.expires_from_now(one_sec)) { RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - + } timer.async_wait(&timer_callback); RM_PRINTF("INFO: not set up (params)\n"); return; } + // It is safe to perform scheduling. lock_scheduler.lock(); RM_PRINTF("INFO: performing scheduling\n"); + // 1. If there are no flows, then revert to slow check mode. if (active_fds_queue.empty() && paused_fds_queue.empty()) { - if (timer.expires_from_now(one_sec)) + if (timer.expires_from_now(one_sec)) { RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - + } timer.async_wait(&timer_callback); lock_scheduler.unlock(); RM_PRINTF("INFO: no flows\n"); return; } + // 2. Remove closed flows from both queues. + // active_fds_queue + for (unsigned long i = 0; i < active_fds_queue.size(); ++i) { + auto a = active_fds_queue.front(); + active_fds_queue.pop(); + if (fd_to_flow.contains(a.first)) active_fds_queue.push(a); + } + // paused_fds_queue + for (unsigned long i = 0; i < paused_fds_queue.size(); ++i) { + auto p = paused_fds_queue.front(); + paused_fds_queue.pop(); + if (fd_to_flow.contains(p)) paused_fds_queue.push(p); + } + + // 3. Idle timeout. Look through all active flows and pause any that have been + // idle for longer than idle_timeout_us. Only do this if there are paused + // flows. boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); - if (now < active_fds_queue.front().second) { - // The next flow should not be scheduled yet. - if (timer.expires_from_now(active_fds_queue.front().second - now)) - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + if (idle_timeout_us > 0 && !active_fds_queue.empty()) { + for (unsigned long i = 0; i < active_fds_queue.size(); ++i) { + auto a = active_fds_queue.front(); + active_fds_queue.pop(); + // Look up last active time. + unsigned long last_data_time_ns; + if (bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first], + &last_data_time_ns)) { + active_fds_queue.push(a); + continue; + } + boost::posix_time::ptime last_data_time = { + {1970, 1, 1}, {0, 0, 0, (long)last_data_time_ns}}; + // If the flow has been active within the idle timeout, then keep it. + if ((now - last_data_time).total_microseconds() < (long)idle_timeout_us) { + active_fds_queue.push(a); + continue; + } + // Otherwise, pause the flow. + paused_fds_queue.push(a.first); + bpf_map_update_elem(flow_to_rwnd_fd, &fd_to_flow[a.first], &zero, + BPF_ANY); + trigger_ack(a.first); + RM_PRINTF("INFO: paused FD=%d due to idle timeout\n", a.first); + } + } - timer.async_wait(&timer_callback); - lock_scheduler.unlock(); - RM_PRINTF("INFO: early by %ld us\n", - (active_fds_queue.front().second - now).total_microseconds()); - return; + // 3. Calculate how many flows to activate. + // Start with the number of free slots. + unsigned long num_to_activate = max_active_flows - active_fds_queue.size(); + // If the next flow should be scheduled now, then activate at least + // num_to_schedule flows. + if (now > active_fds_queue.front().second) { + num_to_activate = std::max(num_to_activate, (unsigned long)num_to_schedule); } + // Finally, do not activate more flows than are paused. + num_to_activate = std::min(num_to_activate, paused_fds_queue.size()); + + // 4. Activate the prescribed number of flows. boost::posix_time::ptime now_plus_epoch = now + boost::posix_time::microseconds(epoch_us); + for (unsigned long i = 0; i < num_to_activate; ++i) { + int to_activate = paused_fds_queue.front(); + paused_fds_queue.pop(); + // Randomly jitter the epoch time by +/- 12.5%. + int rand = + std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - + (int)std::roundl(epoch_us * 0.125); + active_fds_queue.push( + {to_activate, now_plus_epoch + boost::posix_time::microseconds(rand)}); + bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate])); + trigger_ack(to_activate); + RM_PRINTF("INFO: activated FD=%d\n", to_activate); + } - // Need to keep track of the number of active flows before we do any - // scheduling so that we do not accidentally pause flows that we just - // activated. - auto num_active = active_fds_queue.size(); - - // Schedule up to num_to_schedule flows, one at a time. But if there are not - // enough flows, then schedule the max number in either queue. One iteration - // of this loop can service one flow from each queue. - for (unsigned int i = 0; - i < std::min((long unsigned int)num_to_schedule, - std::max(active_fds_queue.size(), paused_fds_queue.size())); - ++i) { - // If there are paused flows, then activate one. Loop until we find a paused - // flow that is valid, meaning that it has not been removed from fd_to_flow - // (i.e., has not been close()'ed). - while (!paused_fds_queue.empty()) { - int to_activate = paused_fds_queue.front(); - paused_fds_queue.pop(); - if (fd_to_flow.contains(to_activate)) { - // This flow is valid, so activate it. Record the time at which its - // epoch is over. - // Randomly jitter the activation time by +/- 12.5% of the epoch. - int rand = - std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - - (int)std::roundl(epoch_us * 0.125); - boost::posix_time::ptime now_plus_epoch_plus_rand = - now_plus_epoch + boost::posix_time::microseconds(rand); - active_fds_queue.push({to_activate, now_plus_epoch_plus_rand}); - bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate])); - trigger_ack(to_activate); - RM_PRINTF("INFO: activated FD=%d\n", to_activate); - break; - } - } - - // Do not pause more active flows than were active at the start of this - // epoch, otherwise we will be pausing flows that we just activated. - if (i < num_active) { - // We always need to pop the front of the active_fds_queue because it was - // that flow's timer which triggered the current scheduling event. We - // either schedule that flow again or pause it. - int to_pause = active_fds_queue.front().first; - active_fds_queue.pop(); - if (fd_to_flow.contains(to_pause)) { - if (active_fds_queue.size() < max_active_flows && - paused_fds_queue.empty()) { - // If there are fewer than the limit flows active and there are no - // waiting flows, then scheduling this flow again. But first, check to - // make sure that it is still valid (see above). - int rand = - std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - - (int)std::roundl(epoch_us * 0.125); - boost::posix_time::ptime now_plus_epoch_plus_rand = - now_plus_epoch + boost::posix_time::microseconds(rand); - active_fds_queue.push({to_pause, now_plus_epoch_plus_rand}); - RM_PRINTF("INFO: reactivated FD=%d\n", to_pause); - } else { - // Pause this flow. - paused_fds_queue.push(to_pause); - bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero, - BPF_ANY); - trigger_ack(to_pause); - RM_PRINTF("INFO: paused FD=%d\n", to_pause); - } - } else - RM_PRINTF("INFO: cleaned up FD=%d\n", to_pause); + // 4. Pause excessive flows. + while (active_fds_queue.size() > max_active_flows) { + int to_pause = active_fds_queue.front().first; + active_fds_queue.pop(); + if (active_fds_queue.size() < max_active_flows && + paused_fds_queue.empty()) { + // If there are fewer than the limit flows active and there are no + // waiting flows, then schedule this flow again. Randomly jitter the epoch + // time by +/- 12.5%. + int rand = + std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - + (int)std::roundl(epoch_us * 0.125); + boost::posix_time::ptime now_plus_epoch_plus_rand = + now_plus_epoch + boost::posix_time::microseconds(rand); + active_fds_queue.push({to_pause, now_plus_epoch_plus_rand}); + RM_PRINTF("INFO: reactivated FD=%d\n", to_pause); + } else { + // Pause this flow. + paused_fds_queue.push(to_pause); + bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero, + BPF_ANY); + trigger_ack(to_pause); + RM_PRINTF("INFO: paused FD=%d\n", to_pause); } } - // Invariants. + // 5. Check invariants. // Cannot have more than the max number of active flows. assert(active_fds_queue.size() <= max_active_flows); // If there are no active flows, then there should also be no paused flows. assert(!active_fds_queue.empty() || paused_fds_queue.empty()); - // Schedule the next timer callback for when the next flow should be paused. + // 6. Calculate when the next timer should expire. + boost::posix_time::time_duration when; if (active_fds_queue.empty()) { - // If we cleaned up all flows, then revert to slow check mode. - if (timer.expires_from_now(one_sec)) - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - - timer.async_wait(&timer_callback); - lock_scheduler.unlock(); + // If there are no flows, revert to slow check mode. RM_PRINTF("INFO: no flows remaining\n"); - return; + when = one_sec; + } else if (idle_timeout_us == 0) { + // If we are using idle timeout mode... + when = boost::posix_time::microsec( + std::min((long)idle_timeout_us, + (active_fds_queue.front().second - now).total_microseconds())); + } else { + // If we are not using idle timeout mode... + when = active_fds_queue.front().second - now; } - // Trigger scheduling for the next flow. - if (timer.expires_from_now(active_fds_queue.front().second - now)) - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + // 7. Start the next timer. + if (timer.expires_from_now(one_sec)) { + RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + } timer.async_wait(&timer_callback); lock_scheduler.unlock(); RM_PRINTF("INFO: sleeping until next event\n"); @@ -258,8 +286,9 @@ void thread_func() { // This function is designed to be run in a thread. It is responsible for // managing the async timers that perform scheduling. RM_PRINTF("INFO: scheduler thread started\n"); - if (timer.expires_from_now(one_sec)) + if (timer.expires_from_now(one_sec)) { RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + } timer.async_wait(&timer_callback); RM_PRINTF("INFO: scheduler thread initial sleep\n"); @@ -280,10 +309,11 @@ void thread_func() { lock_scheduler.unlock(); RM_PRINTF("INFO: scheduler thread ended\n"); - if (run) + if (run) { RM_PRINTF( "ERROR: scheduled thread ended before program was signalled to " "stop!\n"); + } } // Catch SIGINT and trigger the scheduler thread and timer to end. @@ -465,8 +495,9 @@ void initial_scheduling(int fd, struct rm_flow *flow) { boost::posix_time::microseconds(rand)}); RM_PRINTF("INFO: allowing new flow FD=%d\n", fd); if (active_fds_queue.size() == 1) { - if (timer.expires_from_now(active_fds_queue.front().second - now) != 1) + if (timer.expires_from_now(active_fds_queue.front().second - now) != 1) { RM_PRINTF("ERROR: should have cancelled 1 timer!\n"); + } timer.async_wait(&timer_callback); RM_PRINTF("INFO: first scheduling event\n"); @@ -497,8 +528,9 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { if (addr != NULL && addr->sa_family != AF_INET) { RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n", addr->sa_family); - if (addr->sa_family == AF_INET6) + if (addr->sa_family == AF_INET6) { RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n"); + } return fd; } @@ -567,8 +599,9 @@ int close(int sockfd) { // from scheduling. fd_to_flow.erase(sockfd); RM_PRINTF("INFO: removed FD=%d\n", sockfd); - } else + } else { RM_PRINTF("INFO: ignoring 'close' for FD=%d, not in fd_to_flow\n", sockfd); + } lock_scheduler.unlock(); diff --git a/ratemon/runtime/c/ratemon.h b/ratemon/runtime/c/ratemon.h index e6b2b15..0758e92 100644 --- a/ratemon/runtime/c/ratemon.h +++ b/ratemon/runtime/c/ratemon.h @@ -11,7 +11,7 @@ // #define RM_PRINTK(...) bpf_printk(__VA_ARGS__) #else #define NDEBUG // Disable assert() calls. -#define RM_PRINTF(...) ; +#define RM_PRINTF(...) // #define RM_PRINTK(...) #endif