From 851e1aa408a999170f3660800c317f02dce26b88 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Mon, 6 May 2024 19:25:50 +0000 Subject: [PATCH] Clean up flow_to_win_scale as well --- ratemon/runtime/c/libratemon_interp.cpp | 77 ++++++++++++++++++------- 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index 6a7775d..a0be380 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -36,8 +36,10 @@ unsigned int epoch_us = 10000; unsigned int num_to_schedule = 1; unsigned short monitor_port = 9000; bool setup = false; -// FD for the flow_to_rwnd map. +// FD for the BPF map "flow_to_rwnd". int flow_to_rwnd_fd = 0; +// FD for the BPF map "flow_to_win_sca" (short for "flow_to_win_scale"). +int flow_to_win_scale_fd = 0; // Protects active_fds_queue and paused_fds_queue. std::mutex lock_scheduler; @@ -139,6 +141,7 @@ void timer_callback(const boost::system::error_code &error) { active_fds_queue.push({to_activate, now_plus_epoch}); bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate])); trigger_ack(to_activate); + RM_PRINTF("INFO: activated FD=%d\n", to_activate); break; } } @@ -151,20 +154,24 @@ void timer_callback(const boost::system::error_code &error) { // either schedule that flow again or pause it. int to_pause = active_fds_queue.front().first; active_fds_queue.pop(); - if (active_fds_queue.size() < max_active_flows && - paused_fds_queue.empty()) { - // If there are fewer than the limit flows active and there are no - // waiting flows, then scheduling this flow again. But first, check to - // make sure that it is still valid (see above). - if (fd_to_flow.contains(to_pause)) { + if (fd_to_flow.contains(to_pause)) { + if (active_fds_queue.size() < max_active_flows && + paused_fds_queue.empty()) { + // If there are fewer than the limit flows active and there are no + // waiting flows, then scheduling this flow again. But first, check to + // make sure that it is still valid (see above). active_fds_queue.push({to_pause, now_plus_epoch}); + RM_PRINTF("INFO: reactivated FD=%d\n", to_pause); + } else { + // Pause this flow. + paused_fds_queue.push(to_pause); + bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero, + BPF_ANY); + trigger_ack(to_pause); + RM_PRINTF("INFO: paused FD=%d\n", to_pause); } } else { - // Pause this flow. - paused_fds_queue.push(to_pause); - bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero, - BPF_ANY); - trigger_ack(to_pause); + RM_PRINTF("INFO: cleaned up FD=%d\n", to_pause); } } } @@ -271,18 +278,32 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { // for this. int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH); if (err == -1) { - RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd'\n"); + RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd' from path '%s'\n", + RM_FLOW_TO_RWND_PIN_PATH); lock_setup.unlock(); return new_fd; } flow_to_rwnd_fd = err; RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n"); - setup = true; + + // Look up the FD for the flow_to_win_scale map. We do not need the BPF + // skeleton for this. + err = bpf_obj_get(RM_FLOW_TO_WIN_SCALE_PIN_PATH); + if (err == -1) { + RM_PRINTF( + "ERROR: failed to get FD for 'flow_to_win_scale' from path '%s'\n", + RM_FLOW_TO_WIN_SCALE_PIN_PATH); + lock_setup.unlock(); + return new_fd; + } + flow_to_win_scale_fd = err; + RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n"); RM_PRINTF( "INFO: setup complete! max_active_flows=%u, epoch_us=%u, " "num_to_schedule=%u, monitor_port=%u\n", max_active_flows, epoch_us, num_to_schedule, monitor_port); + setup = true; } lock_setup.unlock(); @@ -345,6 +366,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { active_fds_queue.push( {new_fd, boost::posix_time::microsec_clock::local_time() + boost::posix_time::microseconds(epoch_us)}); + RM_PRINTF("INFO: allowing new flow FD=%d\n", new_fd); if (active_fds_queue.size() == 1) { timer.cancel(); timer.expires_at(active_fds_queue.front().second); @@ -355,6 +377,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { paused_fds_queue.push(new_fd); // Pausing a flow means retting its RWND to 0 B. bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY); + RM_PRINTF("INFO: paused new flow FD=%d\n", new_fd); } lock_scheduler.unlock(); @@ -372,19 +395,29 @@ int close(int sockfd) { } int ret = real_close(sockfd); - // To get the flow struct for this FD, we must use visit() to look it up - // in the concurrent_flat_map. Obviously, do this before removing the FD - // from fd_to_flow. - bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd])); - // Removing the FD from fd_to_flow triggers it to be (eventually) removed from - // scheduling. - fd_to_flow.erase(sockfd); + // Remove this FD from all data structures. + lock_scheduler.lock(); + if (fd_to_flow.contains(sockfd)) { + // Obviously, do this before removing the FD from fd_to_flow. + if (flow_to_rwnd_fd != 0) { + bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd])); + } + if (flow_to_win_scale_fd != 0) { + bpf_map_delete_elem(flow_to_win_scale_fd, &(fd_to_flow[sockfd])); + } + // Removing the FD from fd_to_flow triggers it to be (eventually) removed + // from scheduling. + fd_to_flow.erase(sockfd); + RM_PRINTF("INFO: removed FD=%d\n", sockfd); + } else { + RM_PRINTF("INFO: ignoring 'close' for FD=%d, not in fd_to_flow\n", sockfd); + } + lock_scheduler.unlock(); if (ret == -1) { RM_PRINTF("ERROR: real 'close' failed\n"); return ret; } - RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd); return ret; }