From d351665eded104c688a12636625d5a04b5fb6580 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Thu, 2 May 2024 16:06:45 +0000 Subject: [PATCH] Locking in libratemon_interp --- ratemon/runtime/libratemon_interp.cpp | 59 +++++++++++++++------------ 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/ratemon/runtime/libratemon_interp.cpp b/ratemon/runtime/libratemon_interp.cpp index a13efec..ae4d2d4 100644 --- a/ratemon/runtime/libratemon_interp.cpp +++ b/ratemon/runtime/libratemon_interp.cpp @@ -24,21 +24,21 @@ #include "ratemon.h" -volatile unsigned int max_active_flows = 0; -volatile unsigned int epoch_us = 0; -volatile bool setup = false; -volatile bool skipped_first = false; +std::mutex lock_control; +unsigned int max_active_flows = 0; +unsigned int epoch_us = 0; +bool setup = false; +bool skipped_first = false; +// FD for the flow_to_rwnd map. +int flow_to_rwnd_fd = 0; // Protects active_fds_queue and paused_fds_queue. -std::mutex lock; +std::mutex lock_scheduler; std::queue active_fds_queue; std::queue paused_fds_queue; // Maps file descriptor to rm_flow struct. std::unordered_map fd_to_flow; -// FD for the flow_to_rwnd map. -int flow_to_rwnd_fd = 0; - // Used to set entries in flow_to_rwnd. int zero = 0; @@ -54,21 +54,28 @@ inline void trigger_ack(int fd) { } void thread_func() { + RM_PRINTF("INFO: libratemon_interp scheduling thread started\n"); + // If setup has not been performed yet, then we cannot perform scheduling. + while (!setup) { + // RM_PRINTF("WARNING setup not completed, skipping scheduling\n"); + usleep(10000); + } + // At this point, max_active_flows, epoch_us, and flow_to_rwnd_fd should be. + if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0) { + lock_control.unlock(); + RM_PRINTF( + "ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, or " + "flow_to_rwnd_fd=%d\n", + max_active_flows, epoch_us, flow_to_rwnd_fd); + return; + } + // Previously paused flows that will be activated. std::vector new_active_fds; // Preallocate suffient space. new_active_fds.reserve(max_active_flows); - RM_PRINTF("INFO: libratemon_interp scheduling thread started\n"); - while (true) { - // If setup has not been performed yet, then we cannot perform scheduling. - if (!setup) { - // RM_PRINTF("WARNING setup not completed, skipping scheduling\n"); - usleep(10000); - continue; - } - usleep(epoch_us); // If fewer than the max number of flows exist and they are all active, then @@ -81,7 +88,7 @@ void thread_func() { RM_PRINTF("Performing scheduling\n"); new_active_fds.clear(); - lock.lock(); + lock_scheduler.lock(); // Try to find max_active_flows FDs to unpause. while (!paused_fds_queue.empty() and @@ -122,10 +129,8 @@ void thread_func() { } RM_PRINTF("\n"); - lock.unlock(); + lock_scheduler.unlock(); new_active_fds.clear(); - - fflush(stdout); } } @@ -170,6 +175,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { } // Perform BPF setup (only once for all flows in this process). + lock_control.lock(); if (!setup) { if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) { return new_fd; @@ -190,15 +196,19 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { RM_PRINTF("INFO: max_active_flows=%u epoch_us=%u\n", max_active_flows, epoch_us); } + lock_control.unlock(); // Hack for iperf3. The first flow is a control flow that should not be // scheduled. Note that for this hack to work, libratemon_interp must be // restarted between tests. + lock_control.lock(); if (fd_to_flow.size() == 0 && !skipped_first) { RM_PRINTF("WARNING: skipping first flow\n"); skipped_first = true; + lock_control.unlock(); return new_fd; } + lock_control.unlock(); // Set the CCA and make sure it was set correctly. if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC, @@ -244,10 +254,9 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { .remote_port = ntohs(remote_addr.sin_port)}; // RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port, // flow.local_addr, flow.local_port); - fd_to_flow[new_fd] = flow; - - lock.lock(); + lock_scheduler.lock(); + fd_to_flow[new_fd] = flow; // Should this flow be active or paused? if (active_fds_queue.size() < max_active_flows) { // Less than the max number of flows are active, so make this one active. @@ -259,7 +268,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY); } - lock.unlock(); + lock_scheduler.unlock(); RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd); return new_fd;