diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index ba3bc75..99695dd 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -30,12 +30,12 @@ #include "ratemon.h" -// Protects writes only to max_active_flows, epoch_us, num_to_schedule, -// monitor_port_start, monitor_port_end, flow_to_rwnd_fd, flow_to_win_scale_fd, -// oldact, and setup. Reads are unprotected. +// Protects writes only to max_active_flows, epoch_us, idle_timeout_us, +// num_to_schedule, monitor_port_start, monitor_port_end, flow_to_rwnd_fd, +// flow_to_win_scale_fd, oldact, and setup. Reads are unprotected. std::mutex lock_setup; // Whether setup has been performed. -bool setup = false; +bool setup_done = false; // Used to signal the scheduler thread to end. bool run = true; // Existing signal handler for SIGINT. @@ -44,6 +44,8 @@ struct sigaction oldact; int flow_to_rwnd_fd = 0; // FD for the BPF map "flow_to_win_sca" (short for "flow_to_win_scale"). int flow_to_win_scale_fd = 0; +// FD for the BPF map "flow_to_last_da" (short for "flow_to_last_data_time_ns"). +int flow_to_last_data_time_fd = 0; // Runs async timers for scheduling boost::asio::io_service io; // Periodically performs scheduling using timer_callback(). @@ -63,6 +65,7 @@ std::unordered_map fd_to_flow; // parameter documentation. unsigned int max_active_flows = 5; unsigned int epoch_us = 10000; +unsigned int idle_timeout_us = 1000; unsigned int num_to_schedule = 1; unsigned short monitor_port_start = 9000; unsigned short monitor_port_end = 9999; @@ -74,6 +77,8 @@ boost::posix_time::seconds one_sec = boost::posix_time::seconds(1); union tcp_cc_info placeholder_cc_info; socklen_t placeholder_cc_info_length = (socklen_t)sizeof(placeholder_cc_info); +// Trigger a pure ACK packet to be send on this FD by calling getsockopt() with +// TCP_CC_INFO. This only works if the flow is using the CCA BPF_CUBIC. inline void trigger_ack(int fd) { // Do not store the output to check for errors since there is nothing we can // do. @@ -82,43 +87,43 @@ inline void trigger_ack(int fd) { } void timer_callback(const boost::system::error_code &error) { - // Call this when a flow should be paused. Only one flow will be paused at a - // time. If there are waiting flows, one will be activated. Flows are paused - // and activated in round-robin order. Each flow is allowed to be active for - // epoch_us microseconds. + // Call this when a flow should be paused. If there are waiting flows and + // available capacity, then one will be activated. Flows are paused and + // activated in round-robin order. Each flow is allowed to be active for + // at most epoch_us microseconds. RM_PRINTF("INFO: in timer_callback\n"); - if (error) { RM_PRINTF("ERROR: timer_callback error: %s\n", error.message().c_str()); return; } - if (!run) { RM_PRINTF("INFO: exiting\n"); return; } // If setup has not been performed yet, then we cannot perform scheduling. - if (!setup) { - if (timer.expires_from_now(one_sec)) { + if (!setup_done) { + if (timer.expires_from_now(one_sec)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); RM_PRINTF("INFO: not set up (flag)\n"); return; } - // At this point, max_active_flows, epoch_us, num_to_schedule, and - // flow_to_rwnd_fd should be set. - if (max_active_flows == 0 || epoch_us == 0 || num_to_schedule == 0 || - flow_to_rwnd_fd == 0) { + // At this point, max_active_flows, epoch_us, num_to_schedule, + // flow_to_rwnd_fd, and flow_to_last_data_time_fd should be set. + if (max_active_flows == 0 || epoch_us == 0 || idle_timeout_us || + num_to_schedule == 0 || flow_to_rwnd_fd == 0 || + flow_to_last_data_time_fd == 0) { RM_PRINTF( "ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, " - "num_to_schedule=%u, or " - "flow_to_rwnd_fd=%d\n", - max_active_flows, epoch_us, num_to_schedule, flow_to_rwnd_fd); - if (timer.expires_from_now(one_sec)) { + "idle_timeout_us=%u, num_to_schedule=%u, flow_to_rwnd_fd=%d, or " + "flow_to_last_data_time_fd=%d\n", + max_active_flows, epoch_us, idle_timeout_us, num_to_schedule, + flow_to_rwnd_fd, flow_to_last_data_time_fd); + if (timer.expires_from_now(one_sec)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); RM_PRINTF("INFO: not set up (params)\n"); return; @@ -128,9 +133,9 @@ void timer_callback(const boost::system::error_code &error) { RM_PRINTF("INFO: performing scheduling\n"); if (active_fds_queue.empty() && paused_fds_queue.empty()) { - if (timer.expires_from_now(one_sec)) { + if (timer.expires_from_now(one_sec)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); lock_scheduler.unlock(); RM_PRINTF("INFO: no flows\n"); @@ -141,9 +146,9 @@ void timer_callback(const boost::system::error_code &error) { boost::posix_time::microsec_clock::local_time(); if (now < active_fds_queue.front().second) { // The next flow should not be scheduled yet. - if (timer.expires_from_now(active_fds_queue.front().second - now)) { + if (timer.expires_from_now(active_fds_queue.front().second - now)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); lock_scheduler.unlock(); RM_PRINTF("INFO: early by %ld us\n", @@ -217,9 +222,8 @@ void timer_callback(const boost::system::error_code &error) { trigger_ack(to_pause); RM_PRINTF("INFO: paused FD=%d\n", to_pause); } - } else { + } else RM_PRINTF("INFO: cleaned up FD=%d\n", to_pause); - } } } @@ -232,18 +236,18 @@ void timer_callback(const boost::system::error_code &error) { // Schedule the next timer callback for when the next flow should be paused. if (active_fds_queue.empty()) { // If we cleaned up all flows, then revert to slow check mode. - if (timer.expires_from_now(one_sec)) { + if (timer.expires_from_now(one_sec)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); lock_scheduler.unlock(); RM_PRINTF("INFO: no flows remaining\n"); return; } // Trigger scheduling for the next flow. - if (timer.expires_from_now(active_fds_queue.front().second - now)) { + if (timer.expires_from_now(active_fds_queue.front().second - now)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); lock_scheduler.unlock(); RM_PRINTF("INFO: sleeping until next event\n"); @@ -254,9 +258,9 @@ void thread_func() { // This function is designed to be run in a thread. It is responsible for // managing the async timers that perform scheduling. RM_PRINTF("INFO: scheduler thread started\n"); - if (timer.expires_from_now(one_sec)) { + if (timer.expires_from_now(one_sec)) RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } + timer.async_wait(&timer_callback); RM_PRINTF("INFO: scheduler thread initial sleep\n"); // Execute the configured events, until there are no more events to execute. @@ -265,21 +269,21 @@ void thread_func() { // Delete all flows from flow_to_rwnd and flow_to_win_scale. lock_scheduler.lock(); for (const auto &p : fd_to_flow) { - if (flow_to_rwnd_fd != 0) { - bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); - } - if (flow_to_win_scale_fd != 0) { + if (flow_to_rwnd_fd != 0) bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); + + if (flow_to_win_scale_fd != 0) bpf_map_delete_elem(flow_to_win_scale_fd, &p.second); - } + + if (flow_to_last_data_time_fd != 0) + bpf_map_delete_elem(flow_to_last_data_time_fd, &p.second); } lock_scheduler.unlock(); RM_PRINTF("INFO: scheduler thread ended\n"); - if (run) { + if (run) RM_PRINTF( "ERROR: scheduled thread ended before program was signalled to " "stop!\n"); - } } // Catch SIGINT and trigger the scheduler thread and timer to end. @@ -300,7 +304,8 @@ void sigint_handler(int signum) { raise(signum); } -bool read_env_uint(const char *key, volatile unsigned int *dest) { +bool read_env_uint(const char *key, volatile unsigned int *dest, + bool allow_zero = false) { // Read an environment variable as an unsigned integer. char *val_str = getenv(key); if (val_str == NULL) { @@ -308,176 +313,146 @@ bool read_env_uint(const char *key, volatile unsigned int *dest) { return false; } int val_int = atoi(val_str); - if (val_int <= 0) { + if (allow_zero and val_int < 0) { RM_PRINTF("ERROR: invalid value for '%s'=%d (must be > 0)\n", key, val_int); return false; + } else if (!allow_zero and val_int <= 0) { + RM_PRINTF("ERROR: invalid value for '%s'=%d (must be >= 0)\n", key, + val_int); + return false; } *dest = (unsigned int)val_int; return true; } -// For some reason, C++ function name mangling does not prevent us from -// overriding accept(), so we do not need 'extern "C"'. -int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { - static int (*real_accept)(int, struct sockaddr *, socklen_t *) = - (int (*)(int, struct sockaddr *, socklen_t *))dlsym(RTLD_NEXT, "accept"); - if (real_accept == NULL) { - RM_PRINTF("ERROR: failed to query dlsym for 'accept': %s\n", dlerror()); - return -1; - } - int new_fd = real_accept(sockfd, addr, addrlen); - if (new_fd == -1) { - RM_PRINTF("ERROR: real 'accept' failed\n"); - return new_fd; - } - if (addr != NULL && addr->sa_family != AF_INET) { - RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n", - addr->sa_family); - if (addr->sa_family == AF_INET6) { - RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n"); - } - return new_fd; +// Perform setup (only once for all flows in this process), such as reading +// parameters from environment variables and looking up the BPF map +// flow_to_rwnd. +bool setup() { + // Read environment variables with parameters. + if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) return false; + if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us, true /* allow_zero */)) + return false; + if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us)) return false; + // Cannot schedule more than max_active_flows at once. + if (!read_env_uint(RM_NUM_TO_SCHEDULE_KEY, &num_to_schedule) || + num_to_schedule > max_active_flows) + return false; + unsigned int monitor_port_start_; + if (!read_env_uint(RM_MONITOR_PORT_START_KEY, &monitor_port_start_) || + monitor_port_start_ >= 65536) + return false; + monitor_port_start = (unsigned short)monitor_port_start_; + unsigned int monitor_port_end_; + if (!read_env_uint(RM_MONITOR_PORT_END_KEY, &monitor_port_end_) || + monitor_port_end_ >= 65536) + return false; + monitor_port_end = (unsigned short)monitor_port_end_; + + // Look up the FD for the flow_to_rwnd map. We do not need the BPF skeleton + // for this. + int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH); + if (err == -1) { + RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd' from path '%s'\n", + RM_FLOW_TO_RWND_PIN_PATH); + return false; } + flow_to_rwnd_fd = err; + RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n"); - if (!run) { - // If we have been signalled to quit, then do nothing more. - return new_fd; + // Look up the FD for the flow_to_win_scale map. We do not need the BPF + // skeleton for this. + err = bpf_obj_get(RM_FLOW_TO_WIN_SCALE_PIN_PATH); + if (err == -1) { + RM_PRINTF( + "ERROR: failed to get FD for 'flow_to_win_scale' from path '%s'\n", + RM_FLOW_TO_WIN_SCALE_PIN_PATH); + return false; } + flow_to_win_scale_fd = err; + RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n"); - // Perform setup (only once for all flows in this process), such as reading - // parameters from environment variables and looking up the BPF map - // flow_to_rwnd. - lock_setup.lock(); - if (!setup) { - if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) { - lock_setup.unlock(); - return new_fd; - } - if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) { - lock_setup.unlock(); - return new_fd; - } - // Cannot schedule more than max_active_flows at once. - if (!read_env_uint(RM_NUM_TO_SCHEDULE_KEY, &num_to_schedule) || - num_to_schedule > max_active_flows) { - lock_setup.unlock(); - return new_fd; - } - unsigned int monitor_port_start_; - if (!read_env_uint(RM_MONITOR_PORT_START_KEY, &monitor_port_start_) || - monitor_port_start_ >= 65536) { - lock_setup.unlock(); - return new_fd; - } - monitor_port_start = (unsigned short)monitor_port_start_; - unsigned int monitor_port_end_; - if (!read_env_uint(RM_MONITOR_PORT_END_KEY, &monitor_port_end_) || - monitor_port_end_ >= 65536) { - lock_setup.unlock(); - return new_fd; - } - monitor_port_end = (unsigned short)monitor_port_end_; - - // Look up the FD for the flow_to_rwnd map. We do not need the BPF skeleton - // for this. - int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH); - if (err == -1) { - RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd' from path '%s'\n", - RM_FLOW_TO_RWND_PIN_PATH); - lock_setup.unlock(); - return new_fd; - } - flow_to_rwnd_fd = err; - RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n"); - - // Look up the FD for the flow_to_win_scale map. We do not need the BPF - // skeleton for this. - err = bpf_obj_get(RM_FLOW_TO_WIN_SCALE_PIN_PATH); - if (err == -1) { - RM_PRINTF( - "ERROR: failed to get FD for 'flow_to_win_scale' from path '%s'\n", - RM_FLOW_TO_WIN_SCALE_PIN_PATH); - lock_setup.unlock(); - return new_fd; - } - flow_to_win_scale_fd = err; - RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n"); - - // Catch SIGINT to end the program. - struct sigaction action; - action.sa_handler = sigint_handler; - sigemptyset(&action.sa_mask); - action.sa_flags = SA_RESETHAND; - sigaction(SIGINT, &action, &oldact); - - // Launch the scheduler thread. - scheduler_thread = boost::thread(thread_func); - + // Look up the FD for the flow_to_last_data_time_ns map. We do not need the + // BPF skeleton for this. + err = bpf_obj_get(RM_FLOW_TO_LAST_DATA_TIME_PIN_PATH); + if (err == -1) { RM_PRINTF( - "INFO: setup complete! max_active_flows=%u, epoch_us=%u, " - "num_to_schedule=%u, monitor_port_start=%u, monitor_port_end=%u\n", - max_active_flows, epoch_us, num_to_schedule, monitor_port_start, - monitor_port_end); - setup = true; + "ERROR: failed to get FD for 'flow_to_last_data_time_ns' from path " + "'%s'\n", + RM_FLOW_TO_RWND_PIN_PATH); + return false; } - lock_setup.unlock(); + flow_to_last_data_time_fd = err; + RM_PRINTF("INFO: successfully looked up 'flow_to_last_data_time_ns' FD\n"); + + // Catch SIGINT to end the program. + struct sigaction action; + action.sa_handler = sigint_handler; + sigemptyset(&action.sa_mask); + action.sa_flags = SA_RESETHAND; + sigaction(SIGINT, &action, &oldact); + + // Launch the scheduler thread. + scheduler_thread = boost::thread(thread_func); + + RM_PRINTF( + "INFO: setup complete! max_active_flows=%u, epoch_us=%u, " + "idle_timeout_us=%u, " + "num_to_schedule=%u, monitor_port_start=%u, monitor_port_end=%u\n", + max_active_flows, epoch_us, idle_timeout_us, num_to_schedule, + monitor_port_start, monitor_port_end); + return true; +} +bool get_flow(int fd, struct rm_flow *flow) { // Determine the four-tuple, which we need to track because RWND tuning is // applied based on four-tuple. struct sockaddr_in local_addr; socklen_t local_addr_len = sizeof(local_addr); // Get the local IP and port. - if (getsockname(new_fd, (struct sockaddr *)&local_addr, &local_addr_len) == - -1) { + if (getsockname(fd, (struct sockaddr *)&local_addr, &local_addr_len) == -1) { RM_PRINTF("ERROR: failed to call 'getsockname'\n"); - return -1; + return false; } struct sockaddr_in remote_addr; socklen_t remote_addr_len = sizeof(remote_addr); // Get the peer's (i.e., the remote) IP and port. - if (getpeername(new_fd, (struct sockaddr *)&remote_addr, &remote_addr_len) == + if (getpeername(fd, (struct sockaddr *)&remote_addr, &remote_addr_len) == -1) { RM_PRINTF("ERROR: failed to call 'getpeername'\n"); - return -1; + return false; } // Fill in the four-tuple. - struct rm_flow flow = {.local_addr = ntohl(local_addr.sin_addr.s_addr), - .remote_addr = ntohl(remote_addr.sin_addr.s_addr), - .local_port = ntohs(local_addr.sin_port), - .remote_port = ntohs(remote_addr.sin_port)}; - RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port, - flow.local_addr, flow.local_port); - - if (!(flow.remote_port >= monitor_port_start && - flow.remote_port <= monitor_port_end)) { - RM_PRINTF( - "INFO: ignoring flow on remote port %u, not in monitor port range: " - "[%u, %u]\n", - flow.remote_port, monitor_port_start, monitor_port_end); - return new_fd; - } + flow->local_addr = ntohl(local_addr.sin_addr.s_addr); + flow->remote_addr = ntohl(remote_addr.sin_addr.s_addr); + flow->local_port = ntohs(local_addr.sin_port); + flow->remote_port = ntohs(remote_addr.sin_port); + return true; +} +bool set_cca(int fd, const char *cca) { // Set the CCA and make sure it was set correctly. - if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC, - strlen(RM_BPF_CUBIC)) == -1) { + if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, cca, strlen(cca)) == -1) { RM_PRINTF("ERROR: failed to 'setsockopt' TCP_CONGESTION\n"); - return new_fd; + return false; } char retrieved_cca[32]; socklen_t retrieved_cca_len = sizeof(retrieved_cca); - if (getsockopt(new_fd, SOL_TCP, TCP_CONGESTION, retrieved_cca, + if (getsockopt(fd, SOL_TCP, TCP_CONGESTION, retrieved_cca, &retrieved_cca_len) == -1) { RM_PRINTF("ERROR: failed to 'getsockopt' TCP_CONGESTION\n"); - return new_fd; + return false; } - if (strcmp(retrieved_cca, RM_BPF_CUBIC)) { - RM_PRINTF("ERROR: failed to set CCA to %s! Actual CCA is: %s\n", - RM_BPF_CUBIC, retrieved_cca); - return new_fd; + if (strcmp(retrieved_cca, cca)) { + RM_PRINTF("ERROR: failed to set CCA to %s! Actual CCA is: %s\n", cca, + retrieved_cca); + return false; } + return true; +} - lock_scheduler.lock(); - fd_to_flow[new_fd] = flow; +// Perform initial scheduling for this flow. +void initial_scheduling(int fd, struct rm_flow *flow) { // Should this flow be active or paused? if (active_fds_queue.size() < max_active_flows) { // Less than the max number of flows are active, so make this one active. @@ -486,28 +461,83 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { int rand = std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - (int)std::roundl(epoch_us * 0.125); - active_fds_queue.push( - {new_fd, now + boost::posix_time::microseconds(epoch_us) + - boost::posix_time::microseconds(rand)}); - RM_PRINTF("INFO: allowing new flow FD=%d\n", new_fd); + active_fds_queue.push({fd, now + boost::posix_time::microseconds(epoch_us) + + boost::posix_time::microseconds(rand)}); + RM_PRINTF("INFO: allowing new flow FD=%d\n", fd); if (active_fds_queue.size() == 1) { - if (timer.expires_from_now(active_fds_queue.front().second - now) != 1) { + if (timer.expires_from_now(active_fds_queue.front().second - now) != 1) RM_PRINTF("ERROR: should have cancelled 1 timer!\n"); - } + timer.async_wait(&timer_callback); RM_PRINTF("INFO: first scheduling event\n"); } } else { // The max number of flows are active already, so pause this one. - paused_fds_queue.push(new_fd); + paused_fds_queue.push(fd); // Pausing a flow means retting its RWND to 0 B. - bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY); - RM_PRINTF("INFO: paused new flow FD=%d\n", new_fd); + bpf_map_update_elem(flow_to_rwnd_fd, flow, &zero, BPF_ANY); + RM_PRINTF("INFO: paused new flow FD=%d\n", fd); } - lock_scheduler.unlock(); +} - RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd); - return new_fd; +// For some reason, C++ function name mangling does not prevent us from +// overriding accept(), so we do not need 'extern "C"'. +int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { + static int (*real_accept)(int, struct sockaddr *, socklen_t *) = + (int (*)(int, struct sockaddr *, socklen_t *))dlsym(RTLD_NEXT, "accept"); + if (real_accept == NULL) { + RM_PRINTF("ERROR: failed to query dlsym for 'accept': %s\n", dlerror()); + return -1; + } + int fd = real_accept(sockfd, addr, addrlen); + if (fd == -1) { + RM_PRINTF("ERROR: real 'accept' failed\n"); + return fd; + } + if (addr != NULL && addr->sa_family != AF_INET) { + RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n", + addr->sa_family); + if (addr->sa_family == AF_INET6) + RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n"); + + return fd; + } + + // If we have been signalled to quit, then do nothing more. + if (!run) return fd; + // One-time setup. + lock_setup.lock(); + if (!setup_done) { + if (!setup()) { + lock_setup.unlock(); + return fd; + } + setup_done = true; + } + lock_setup.unlock(); + // Look up the four-tuple. + struct rm_flow flow; + if (!get_flow(fd, &flow)) return fd; + RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port, + flow.local_addr, flow.local_port); + // Ignore flows that are not in the monitor port range. + if (!(flow.remote_port >= monitor_port_start && + flow.remote_port <= monitor_port_end)) { + RM_PRINTF( + "INFO: ignoring flow on remote port %u, not in monitor port range: " + "[%u, %u]\n", + flow.remote_port, monitor_port_start, monitor_port_end); + return fd; + } + fd_to_flow[fd] = flow; + // Change the CCA to BPF_CUBIC. + if (!set_cca(fd, RM_BPF_CUBIC)) return fd; + // Initial scheduling for this flow. + lock_scheduler.lock(); + initial_scheduling(fd, &flow); + lock_scheduler.unlock(); + RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, fd); + return fd; } // Get around C++ function name mangling. @@ -524,19 +554,22 @@ int close(int sockfd) { lock_scheduler.lock(); if (fd_to_flow.contains(sockfd)) { // Obviously, do this before removing the FD from fd_to_flow. - if (flow_to_rwnd_fd != 0) { + if (flow_to_rwnd_fd != 0) bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd])); - } - if (flow_to_win_scale_fd != 0) { + + if (flow_to_win_scale_fd != 0) bpf_map_delete_elem(flow_to_win_scale_fd, &(fd_to_flow[sockfd])); - } + + if (flow_to_last_data_time_fd != 0) + bpf_map_delete_elem(flow_to_last_data_time_fd, &(fd_to_flow[sockfd])); + // Removing the FD from fd_to_flow triggers it to be (eventually) removed // from scheduling. fd_to_flow.erase(sockfd); RM_PRINTF("INFO: removed FD=%d\n", sockfd); - } else { + } else RM_PRINTF("INFO: ignoring 'close' for FD=%d, not in fd_to_flow\n", sockfd); - } + lock_scheduler.unlock(); if (ret == -1) { diff --git a/ratemon/runtime/c/ratemon.h b/ratemon/runtime/c/ratemon.h index 7cd6647..e6b2b15 100644 --- a/ratemon/runtime/c/ratemon.h +++ b/ratemon/runtime/c/ratemon.h @@ -11,7 +11,7 @@ // #define RM_PRINTK(...) bpf_printk(__VA_ARGS__) #else #define NDEBUG // Disable assert() calls. -#define RM_PRINTF(...) +#define RM_PRINTF(...) ; // #define RM_PRINTK(...) #endif @@ -20,7 +20,8 @@ // Map pin paths. #define RM_FLOW_TO_RWND_PIN_PATH "/sys/fs/bpf/flow_to_rwnd" #define RM_FLOW_TO_WIN_SCALE_PIN_PATH "/sys/fs/bpf/flow_to_win_scale" -#define RM_FLOW_TO_LAST_DATA_TIME_PIN_PATH "/sys/fs/bpf/flow_to_last_data_time_ns" +#define RM_FLOW_TO_LAST_DATA_TIME_PIN_PATH \ + "/sys/fs/bpf/flow_to_last_data_time_ns" // Name of struct_ops CCA that flows must use to be woken up. #define RM_BPF_CUBIC "bpf_cubic" @@ -28,6 +29,9 @@ #define RM_MAX_ACTIVE_FLOWS_KEY "RM_MAX_ACTIVE_FLOWS" // Environment variable that specifies how often to perform flow scheduling. #define RM_EPOCH_US_KEY "RM_EPOCH_US" +// Duration after which an idle flow will be forcibly paused. 0 disables this +// feature. +#define RM_IDLE_TIMEOUT_US_KEY "RM_IDLE_TIMEOUT_US" // Environment variable that specifies the number of flows to schedule per // epoch. "Schedule" can mean either activate or pause. Note that setting this // greater than 1 effectively switches scheduled RWND tuning from a strict