From 5d78c15391e50e995abae1f60fe36b211766e8b2 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Fri, 3 May 2024 21:26:30 +0000 Subject: [PATCH] [WIP] Refactor to use timers --- ratemon/runtime/c/libratemon_interp.cpp | 191 +++++++++++++----------- 1 file changed, 105 insertions(+), 86 deletions(-) diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index ae4d2d4..93c1a8d 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -15,32 +15,36 @@ #include // for socket APIs #include +#include +#include #include #include #include #include #include -#include #include "ratemon.h" +boost::asio::io_service io; +boost::asio::deadline_timer timer(io); + std::mutex lock_control; unsigned int max_active_flows = 0; unsigned int epoch_us = 0; bool setup = false; -bool skipped_first = false; // FD for the flow_to_rwnd map. int flow_to_rwnd_fd = 0; // Protects active_fds_queue and paused_fds_queue. std::mutex lock_scheduler; -std::queue active_fds_queue; +std::queue> active_fds_queue; std::queue paused_fds_queue; // Maps file descriptor to rm_flow struct. std::unordered_map fd_to_flow; // Used to set entries in flow_to_rwnd. int zero = 0; +boost::posix_time::seconds one_sec = boost::posix_time::seconds(1); // As an optimization, reuse the same tcp_cc_info struct and size. union tcp_cc_info placeholder_cc_info; @@ -53,88 +57,105 @@ inline void trigger_ack(int fd) { &placeholder_cc_info_length); } -void thread_func() { - RM_PRINTF("INFO: libratemon_interp scheduling thread started\n"); +void timer_callback(const boost::system::error_code &error) { + // Call this when a flow should be paused. Only one flow will be paused at a + // time. If there are waiting flows, one will be activated. Flows are paused + // and activated in round-robin order. Each flow is allowed to be active for + // epoch_us microseconds. + if (error) { + RM_PRINTF("ERROR: timer_callback error: %s\n", error.message().c_str()); + return; + } + // If setup has not been performed yet, then we cannot perform scheduling. - while (!setup) { + if (!setup) { // RM_PRINTF("WARNING setup not completed, skipping scheduling\n"); - usleep(10000); + timer.expires_from_now(one_sec); + timer.async_wait(&timer_callback); + return; } - // At this point, max_active_flows, epoch_us, and flow_to_rwnd_fd should be. + // At this point, max_active_flows, epoch_us, and flow_to_rwnd_fd should be + // set. if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0) { - lock_control.unlock(); RM_PRINTF( "ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, or " "flow_to_rwnd_fd=%d\n", max_active_flows, epoch_us, flow_to_rwnd_fd); + timer.expires_from_now(one_sec); + timer.async_wait(&timer_callback); return; } - // Previously paused flows that will be activated. - std::vector new_active_fds; - // Preallocate suffient space. - new_active_fds.reserve(max_active_flows); + lock_scheduler.lock(); + RM_PRINTF("Performing scheduling\n"); - while (true) { - usleep(epoch_us); + if (active_fds_queue.empty()) { + timer.expires_from_now(one_sec); + timer.async_wait(&timer_callback); + lock_scheduler.unlock(); + return; + } - // If fewer than the max number of flows exist and they are all active, then - // there is no need for scheduling. - if (active_fds_queue.size() < max_active_flows && - paused_fds_queue.size() == 0) { - // RM_PRINTF("WARNING insufficient flows, skipping scheduling\n"); - continue; + boost::posix_time::ptime now_plus_epoch = + boost::posix_time::microsec_clock::local_time() + + boost::posix_time::microseconds(epoch_us); + + // if (boost::posix_time::microsec_clock::local_time() < + // active_fds_queue.front().second) { + // // The next flow should not be paused yet. + // timer.expires_at(active_fds_queue.front().second); + // timer.async_wait(&timer_callback); + // lock_scheduler.unlock(); + // return; + // } + + // If there are paused flows, then unpause one and remember that we did that. + bool found_paused = false; + while (!paused_fds_queue.empty()) { + // Activate the next flow. Record the time at which is should be paused. + int to_activate = paused_fds_queue.front(); + paused_fds_queue.pop(); + if (fd_to_flow.contains(to_activate)) { + active_fds_queue.push({to_activate, now_plus_epoch}); + bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate])); + trigger_ack(to_activate); + found_paused = true; + break; } + } - RM_PRINTF("Performing scheduling\n"); - new_active_fds.clear(); - lock_scheduler.lock(); - - // Try to find max_active_flows FDs to unpause. - while (!paused_fds_queue.empty() and - new_active_fds.size() < max_active_flows) { - // If we still know about this flow, then we can activate it. - int next_fd = paused_fds_queue.front(); - if (fd_to_flow.contains(next_fd)) { - paused_fds_queue.pop(); - new_active_fds.push_back(next_fd); - } - } - auto num_prev_active = active_fds_queue.size(); - - // For each of the flows chosen to be activated, add it to the active set - // and remove it from the RWND map. Trigger an ACK to wake it up. Note that - // twice the allowable number of flows will be active briefly. - RM_PRINTF("Activating %lu flows: ", new_active_fds.size()); - for (const auto &fd : new_active_fds) { - RM_PRINTF("%d ", fd); - active_fds_queue.push(fd); - bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[fd])); - trigger_ack(fd); - } - RM_PRINTF("\n"); - - // For each fo the previously active flows, add it to the paused set, - // install an RWND mapping to actually pause it, and trigger an ACK to - // communicate the new RWND value. - RM_PRINTF("Pausing %lu flows: ", num_prev_active); - for (unsigned long i = 0; i < num_prev_active; i++) { - int fd = active_fds_queue.front(); - active_fds_queue.pop(); - RM_PRINTF("%d ", fd); - paused_fds_queue.push(fd); - bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[fd]), &zero, BPF_ANY); - // TODO: Do we need to send an ACK to immediately pause the flow? - trigger_ack(fd); + // Pause the next active flow. + int to_pause = active_fds_queue.front().first; + active_fds_queue.pop(); + if (found_paused) { + // If there were other flows waiting and we activated one, then pause this + // flow. + paused_fds_queue.push(to_pause); + bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero, + BPF_ANY); + trigger_ack(to_pause); + } else { + // If there were no waiting flows, then put this flow back in the active + // queue. + if (fd_to_flow.contains(to_pause)) { + active_fds_queue.push({to_pause, now_plus_epoch}); } - RM_PRINTF("\n"); - - lock_scheduler.unlock(); - new_active_fds.clear(); } + lock_scheduler.unlock(); + + // Schedule the next timer callback for when the next flow should be paused. + timer.expires_at(active_fds_queue.front().second); + timer.async_wait(&timer_callback); + return; } -boost::thread t(thread_func); +void thread_func() { + timer.expires_from_now(one_sec); + timer.async_wait(&timer_callback); + io.run(); +} + +boost::thread thread(thread_func); bool read_env_uint(const char *key, volatile unsigned int *dest) { char *val_str = getenv(key); @@ -178,15 +199,18 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { lock_control.lock(); if (!setup) { if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) { + lock_control.unlock(); return new_fd; } if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) { + lock_control.unlock(); return new_fd; } int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH); if (err == -1) { RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd'\n"); + lock_control.unlock(); return new_fd; } flow_to_rwnd_fd = err; @@ -198,18 +222,6 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { } lock_control.unlock(); - // Hack for iperf3. The first flow is a control flow that should not be - // scheduled. Note that for this hack to work, libratemon_interp must be - // restarted between tests. - lock_control.lock(); - if (fd_to_flow.size() == 0 && !skipped_first) { - RM_PRINTF("WARNING: skipping first flow\n"); - skipped_first = true; - lock_control.unlock(); - return new_fd; - } - lock_control.unlock(); - // Set the CCA and make sure it was set correctly. if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC, strlen(RM_BPF_CUBIC)) == -1) { @@ -252,22 +264,28 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { .remote_addr = ntohl(remote_addr.sin_addr.s_addr), .local_port = ntohs(local_addr.sin_port), .remote_port = ntohs(remote_addr.sin_port)}; - // RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port, - // flow.local_addr, flow.local_port); + RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port, + flow.local_addr, flow.local_port); lock_scheduler.lock(); fd_to_flow[new_fd] = flow; // Should this flow be active or paused? if (active_fds_queue.size() < max_active_flows) { // Less than the max number of flows are active, so make this one active. - active_fds_queue.push(new_fd); + active_fds_queue.push( + {new_fd, boost::posix_time::microsec_clock::local_time() + + boost::posix_time::microseconds(epoch_us)}); + if (active_fds_queue.size() == 1) { + timer.cancel(); + timer.expires_at(active_fds_queue.front().second); + timer.async_wait(&timer_callback); + } } else { // The max number of flows are active already, so pause this one. paused_fds_queue.push(new_fd); // Pausing a flow means retting its RWND to 0 B. bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY); } - lock_scheduler.unlock(); RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd); @@ -283,10 +301,6 @@ int close(int sockfd) { return -1; } int ret = real_close(sockfd); - if (ret == -1) { - RM_PRINTF("ERROR: real 'close' failed\n"); - return ret; - } // To get the flow struct for this FD, we must use visit() to look it up // in the concurrent_flat_map. Obviously, do this before removing the FD @@ -296,6 +310,11 @@ int close(int sockfd) { // scheduling. fd_to_flow.erase(sockfd); + if (ret == -1) { + RM_PRINTF("ERROR: real 'close' failed\n"); + return ret; + } + RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd); return ret; }