From c7112543dd4fa0a0dd842c9d8e7d7d9d0446899a Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Wed, 1 May 2024 17:06:51 +0000 Subject: [PATCH] Clean up libratemon_interp, remove unneeded ratemon_maps.bpf.c --- ratemon/runtime/Makefile | 2 +- ratemon/runtime/libratemon_interp.cpp | 133 ++++++++++++-------------- ratemon/runtime/ratemon_maps.bpf.c | 2 - 3 files changed, 60 insertions(+), 77 deletions(-) delete mode 100644 ratemon/runtime/ratemon_maps.bpf.c diff --git a/ratemon/runtime/Makefile b/ratemon/runtime/Makefile index a800c08..9bcc14e 100644 --- a/ratemon/runtime/Makefile +++ b/ratemon/runtime/Makefile @@ -124,7 +124,7 @@ $(APPS): %: $(OUTPUT)/%.o $(LIBBPF_OBJ) | $(OUTPUT) $(call msg,BINARY,$@) $(CC) $(CFLAGS) $^ $(ALL_LDFLAGS) -lelf -lz -o $@ -$(OUTPUT)/libratemon_interp.so: libratemon_interp.cpp $(OUTPUT)/ratemon_maps.skel.h ratemon.h | $(OUTPUT) +$(OUTPUT)/libratemon_interp.so: libratemon_interp.cpp ratemon.h | $(OUTPUT) $(CXX) $(CXXFLAGS) -shared -fPIC -I$(OUTPUT) -I$(BOOST_INCLUDE) $< -ldl -L${BOOST_LIB} -lboost_thread -lbpf -o $@ $(INTERPS): %: $(OUTPUT)/%.so ; diff --git a/ratemon/runtime/libratemon_interp.cpp b/ratemon/runtime/libratemon_interp.cpp index bb6385e..a13efec 100644 --- a/ratemon/runtime/libratemon_interp.cpp +++ b/ratemon/runtime/libratemon_interp.cpp @@ -16,15 +16,16 @@ #include #include -#include #include #include +#include #include #include #include "ratemon.h" -#include "ratemon_maps.skel.h" +volatile unsigned int max_active_flows = 0; +volatile unsigned int epoch_us = 0; volatile bool setup = false; volatile bool skipped_first = false; @@ -32,15 +33,12 @@ volatile bool skipped_first = false; std::mutex lock; std::queue active_fds_queue; std::queue paused_fds_queue; +// Maps file descriptor to rm_flow struct. +std::unordered_map fd_to_flow; -// BPF things. -struct ratemon_maps_bpf *skel = NULL; -// struct bpf_map *flow_to_rwnd = NULL; +// FD for the flow_to_rwnd map. int flow_to_rwnd_fd = 0; -// Maps file descriptor to flow struct. -boost::unordered::concurrent_flat_map fd_to_flow; - // Used to set entries in flow_to_rwnd. int zero = 0; @@ -48,13 +46,6 @@ int zero = 0; union tcp_cc_info placeholder_cc_info; socklen_t placeholder_cc_info_length = (socklen_t)sizeof(placeholder_cc_info); -// Look up the environment variable for max active flows. -unsigned int max_active_flows = - (unsigned int)atoi(getenv(RM_MAX_ACTIVE_FLOWS_KEY)); - -// Look up the environment variable for scheduling epoch. -unsigned int epoch_us = (unsigned int)atoi(getenv(RM_EPOCH_US_KEY)); - inline void trigger_ack(int fd) { // Do not store the output to check for errors since there is nothing we can // do. @@ -68,27 +59,18 @@ void thread_func() { // Preallocate suffient space. new_active_fds.reserve(max_active_flows); - if (max_active_flows == 0 || epoch_us == 0) { - RM_PRINTF("ERROR when querying environment variables '%s' or '%s'\n", - RM_MAX_ACTIVE_FLOWS_KEY, RM_EPOCH_US_KEY); - return; - } - - RM_PRINTF( - "libratemon_interp scheduling thread started, max flows=%u, epoch=%u " - "us\n", - max_active_flows, epoch_us); + RM_PRINTF("INFO: libratemon_interp scheduling thread started\n"); while (true) { - usleep(epoch_us); - // RM_PRINTF("Time to schedule\n"); - // If setup has not been performed yet, then we cannot perform scheduling. if (!setup) { // RM_PRINTF("WARNING setup not completed, skipping scheduling\n"); + usleep(10000); continue; } + usleep(epoch_us); + // If fewer than the max number of flows exist and they are all active, then // there is no need for scheduling. if (active_fds_queue.size() < max_active_flows && @@ -106,7 +88,7 @@ void thread_func() { new_active_fds.size() < max_active_flows) { // If we still know about this flow, then we can activate it. int next_fd = paused_fds_queue.front(); - if (fd_to_flow.visit(next_fd, [](const auto &) {})) { + if (fd_to_flow.contains(next_fd)) { paused_fds_queue.pop(); new_active_fds.push_back(next_fd); } @@ -120,9 +102,7 @@ void thread_func() { for (const auto &fd : new_active_fds) { RM_PRINTF("%d ", fd); active_fds_queue.push(fd); - fd_to_flow.visit(fd, [](const auto &p) { - bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); - }); + bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[fd])); trigger_ack(fd); } RM_PRINTF("\n"); @@ -136,9 +116,7 @@ void thread_func() { active_fds_queue.pop(); RM_PRINTF("%d ", fd); paused_fds_queue.push(fd); - fd_to_flow.visit(fd, [](const auto &p) { - bpf_map_update_elem(flow_to_rwnd_fd, &p.second, &zero, BPF_ANY); - }); + bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[fd]), &zero, BPF_ANY); // TODO: Do we need to send an ACK to immediately pause the flow? trigger_ack(fd); } @@ -153,62 +131,71 @@ void thread_func() { boost::thread t(thread_func); +bool read_env_uint(const char *key, volatile unsigned int *dest) { + char *val_str = getenv(key); + if (val_str == NULL) { + RM_PRINTF("ERROR: failed to query environment variable '%s'\n", key); + return false; + } + int val_int = atoi(val_str); + if (val_int <= 0) { + RM_PRINTF("ERROR: invalid value for '%s'=%d (must be > 0)\n", key, val_int); + return false; + } + *dest = (unsigned int)val_int; + return true; +} + // For some reason, C++ function name mangling does not prevent us from // overriding accept(), so we do not need 'extern "C"'. int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { - if (max_active_flows == 0 || epoch_us == 0) { - RM_PRINTF("ERROR when querying environment variables '%s' or '%s'\n", - RM_MAX_ACTIVE_FLOWS_KEY, RM_EPOCH_US_KEY); - return -1; - } - static int (*real_accept)(int, struct sockaddr *, socklen_t *) = (int (*)(int, struct sockaddr *, socklen_t *))dlsym(RTLD_NEXT, "accept"); if (real_accept == NULL) { - RM_PRINTF("ERROR when querying dlsym for 'accept': %s\n", dlerror()); + RM_PRINTF("ERROR: failed to query dlsym for 'accept': %s\n", dlerror()); return -1; } int new_fd = real_accept(sockfd, addr, addrlen); if (new_fd == -1) { - RM_PRINTF("ERROR in real 'accept'\n"); + RM_PRINTF("ERROR: real 'accept' failed\n"); return new_fd; } if (addr != NULL && addr->sa_family != AF_INET) { - RM_PRINTF("WARNING got 'accept' for non-AF_INET: sa_family=%u\n", + RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n", addr->sa_family); if (addr->sa_family == AF_INET6) { - RM_PRINTF("WARNING (continued) got 'accept' for AF_INET6!\n"); + RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n"); } return new_fd; } // Perform BPF setup (only once for all flows in this process). if (!setup) { - skel = ratemon_maps_bpf__open_and_load(); - if (skel == NULL) { - RM_PRINTF("ERROR: failed to open/load 'ratemon_maps' BPF skeleton\n"); + if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) { + return new_fd; + } + if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) { return new_fd; } - int pinned_map_fd = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH); - - // int err = bpf_map__reuse_fd(skel->maps.flow_to_rwnd, pinned_map_fd); - // if (err) { - // RM_PRINTF("ERROR when reusing map FD\n"); - // return new_fd; - // } - - // flow_to_rwnd = skel->maps.flow_to_rwnd; - flow_to_rwnd_fd = pinned_map_fd; - RM_PRINTF("Successfully reused map FD\n"); + int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH); + if (err == -1) { + RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd'\n"); + return new_fd; + } + flow_to_rwnd_fd = err; + RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n"); setup = true; + + RM_PRINTF("INFO: max_active_flows=%u epoch_us=%u\n", max_active_flows, + epoch_us); } // Hack for iperf3. The first flow is a control flow that should not be // scheduled. Note that for this hack to work, libratemon_interp must be // restarted between tests. if (fd_to_flow.size() == 0 && !skipped_first) { - RM_PRINTF("WARNING skipping first flow\n"); + RM_PRINTF("WARNING: skipping first flow\n"); skipped_first = true; return new_fd; } @@ -216,19 +203,19 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { // Set the CCA and make sure it was set correctly. if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC, strlen(RM_BPF_CUBIC)) == -1) { - RM_PRINTF("ERROR in 'setsockopt' TCP_CONGESTION\n"); + RM_PRINTF("ERROR: failed to 'setsockopt' TCP_CONGESTION\n"); return new_fd; } char retrieved_cca[32]; socklen_t retrieved_cca_len = sizeof(retrieved_cca); if (getsockopt(new_fd, SOL_TCP, TCP_CONGESTION, retrieved_cca, &retrieved_cca_len) == -1) { - RM_PRINTF("ERROR in 'getsockopt' TCP_CONGESTION\n"); + RM_PRINTF("ERROR: failed to 'getsockopt' TCP_CONGESTION\n"); return new_fd; } if (strcmp(retrieved_cca, RM_BPF_CUBIC)) { - RM_PRINTF("ERROR when setting CCA to %s! Actual CCA is: %s\n", RM_BPF_CUBIC, - retrieved_cca); + RM_PRINTF("ERROR: failed to set CCA to %s! Actual CCA is: %s\n", + RM_BPF_CUBIC, retrieved_cca); return new_fd; } @@ -239,7 +226,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { // Get the local IP and port. if (getsockname(new_fd, (struct sockaddr *)&local_addr, &local_addr_len) == -1) { - RM_PRINTF("ERROR when calling 'getsockname'\n"); + RM_PRINTF("ERROR: failed to call 'getsockname'\n"); return -1; } struct sockaddr_in remote_addr; @@ -247,7 +234,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { // Get the peer's (i.e., the remote) IP and port. if (getpeername(new_fd, (struct sockaddr *)&remote_addr, &remote_addr_len) == -1) { - RM_PRINTF("ERROR when calling 'getpeername'\n"); + RM_PRINTF("ERROR: failed to call 'getpeername'\n"); return -1; } // Fill in the four-tuple. @@ -257,7 +244,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { .remote_port = ntohs(remote_addr.sin_port)}; // RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port, // flow.local_addr, flow.local_port); - fd_to_flow.insert({new_fd, flow}); + fd_to_flow[new_fd] = flow; lock.lock(); @@ -274,7 +261,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { lock.unlock(); - RM_PRINTF("Successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd); + RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd); return new_fd; } @@ -283,26 +270,24 @@ extern "C" { int close(int sockfd) { static int (*real_close)(int) = (int (*)(int))dlsym(RTLD_NEXT, "close"); if (real_close == NULL) { - RM_PRINTF("ERROR when querying dlsym for 'close': %s\n", dlerror()); + RM_PRINTF("ERROR: failed to query dlsym for 'close': %s\n", dlerror()); return -1; } int ret = real_close(sockfd); if (ret == -1) { - RM_PRINTF("ERROR in real 'close'\n"); + RM_PRINTF("ERROR: real 'close' failed\n"); return ret; } // To get the flow struct for this FD, we must use visit() to look it up // in the concurrent_flat_map. Obviously, do this before removing the FD // from fd_to_flow. - fd_to_flow.visit(sockfd, [](const auto &p) { - bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); - }); + bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd])); // Removing the FD from fd_to_flow triggers it to be (eventually) removed from // scheduling. fd_to_flow.erase(sockfd); - RM_PRINTF("Successful 'close' for FD=%d\n", sockfd); + RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd); return ret; } } \ No newline at end of file diff --git a/ratemon/runtime/ratemon_maps.bpf.c b/ratemon/runtime/ratemon_maps.bpf.c deleted file mode 100644 index f0bd0cd..0000000 --- a/ratemon/runtime/ratemon_maps.bpf.c +++ /dev/null @@ -1,2 +0,0 @@ - -#include "ratemon_maps.h"