Skip to content

Commit

Permalink
Locking in libratemon_interp
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 2, 2024
1 parent c711254 commit d351665
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions ratemon/runtime/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@

#include "ratemon.h"

volatile unsigned int max_active_flows = 0;
volatile unsigned int epoch_us = 0;
volatile bool setup = false;
volatile bool skipped_first = false;
std::mutex lock_control;
unsigned int max_active_flows = 0;
unsigned int epoch_us = 0;
bool setup = false;
bool skipped_first = false;
// FD for the flow_to_rwnd map.
int flow_to_rwnd_fd = 0;

// Protects active_fds_queue and paused_fds_queue.
std::mutex lock;
std::mutex lock_scheduler;
std::queue<int> active_fds_queue;
std::queue<int> paused_fds_queue;
// Maps file descriptor to rm_flow struct.
std::unordered_map<int, struct rm_flow> fd_to_flow;

// FD for the flow_to_rwnd map.
int flow_to_rwnd_fd = 0;

// Used to set entries in flow_to_rwnd.
int zero = 0;

Expand All @@ -54,21 +54,28 @@ inline void trigger_ack(int fd) {
}

void thread_func() {
RM_PRINTF("INFO: libratemon_interp scheduling thread started\n");
// If setup has not been performed yet, then we cannot perform scheduling.
while (!setup) {
// RM_PRINTF("WARNING setup not completed, skipping scheduling\n");
usleep(10000);
}
// At this point, max_active_flows, epoch_us, and flow_to_rwnd_fd should be.
if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0) {
lock_control.unlock();
RM_PRINTF(
"ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, or "
"flow_to_rwnd_fd=%d\n",
max_active_flows, epoch_us, flow_to_rwnd_fd);
return;
}

// Previously paused flows that will be activated.
std::vector<int> new_active_fds;
// Preallocate suffient space.
new_active_fds.reserve(max_active_flows);

RM_PRINTF("INFO: libratemon_interp scheduling thread started\n");

while (true) {
// If setup has not been performed yet, then we cannot perform scheduling.
if (!setup) {
// RM_PRINTF("WARNING setup not completed, skipping scheduling\n");
usleep(10000);
continue;
}

usleep(epoch_us);

// If fewer than the max number of flows exist and they are all active, then
Expand All @@ -81,7 +88,7 @@ void thread_func() {

RM_PRINTF("Performing scheduling\n");
new_active_fds.clear();
lock.lock();
lock_scheduler.lock();

// Try to find max_active_flows FDs to unpause.
while (!paused_fds_queue.empty() and
Expand Down Expand Up @@ -122,10 +129,8 @@ void thread_func() {
}
RM_PRINTF("\n");

lock.unlock();
lock_scheduler.unlock();
new_active_fds.clear();

fflush(stdout);
}
}

Expand Down Expand Up @@ -170,6 +175,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
}

// Perform BPF setup (only once for all flows in this process).
lock_control.lock();
if (!setup) {
if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) {
return new_fd;
Expand All @@ -190,15 +196,19 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
RM_PRINTF("INFO: max_active_flows=%u epoch_us=%u\n", max_active_flows,
epoch_us);
}
lock_control.unlock();

// Hack for iperf3. The first flow is a control flow that should not be
// scheduled. Note that for this hack to work, libratemon_interp must be
// restarted between tests.
lock_control.lock();
if (fd_to_flow.size() == 0 && !skipped_first) {
RM_PRINTF("WARNING: skipping first flow\n");
skipped_first = true;
lock_control.unlock();
return new_fd;
}
lock_control.unlock();

// Set the CCA and make sure it was set correctly.
if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC,
Expand Down Expand Up @@ -244,10 +254,9 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
.remote_port = ntohs(remote_addr.sin_port)};
// RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port,
// flow.local_addr, flow.local_port);
fd_to_flow[new_fd] = flow;

lock.lock();

lock_scheduler.lock();
fd_to_flow[new_fd] = flow;
// Should this flow be active or paused?
if (active_fds_queue.size() < max_active_flows) {
// Less than the max number of flows are active, so make this one active.
Expand All @@ -259,7 +268,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY);
}

lock.unlock();
lock_scheduler.unlock();

RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd);
return new_fd;
Expand Down

0 comments on commit d351665

Please sign in to comment.