Skip to content

Commit

Permalink
[WIP] Refactor to use timers
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 3, 2024
1 parent 2a3a0a8 commit 5d78c15
Showing 1 changed file with 105 additions and 86 deletions.
191 changes: 105 additions & 86 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,36 @@
#include <sys/socket.h> // for socket APIs
#include <unistd.h>

#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <utility>
#include <vector>

#include "ratemon.h"

boost::asio::io_service io;
boost::asio::deadline_timer timer(io);

std::mutex lock_control;
unsigned int max_active_flows = 0;
unsigned int epoch_us = 0;
bool setup = false;
bool skipped_first = false;
// FD for the flow_to_rwnd map.
int flow_to_rwnd_fd = 0;

// Protects active_fds_queue and paused_fds_queue.
std::mutex lock_scheduler;
std::queue<int> active_fds_queue;
std::queue<std::pair<int, boost::posix_time::ptime>> active_fds_queue;
std::queue<int> paused_fds_queue;
// Maps file descriptor to rm_flow struct.
std::unordered_map<int, struct rm_flow> fd_to_flow;

// Used to set entries in flow_to_rwnd.
int zero = 0;
boost::posix_time::seconds one_sec = boost::posix_time::seconds(1);

// As an optimization, reuse the same tcp_cc_info struct and size.
union tcp_cc_info placeholder_cc_info;
Expand All @@ -53,88 +57,105 @@ inline void trigger_ack(int fd) {
&placeholder_cc_info_length);
}

void thread_func() {
RM_PRINTF("INFO: libratemon_interp scheduling thread started\n");
void timer_callback(const boost::system::error_code &error) {
// Call this when a flow should be paused. Only one flow will be paused at a
// time. If there are waiting flows, one will be activated. Flows are paused
// and activated in round-robin order. Each flow is allowed to be active for
// epoch_us microseconds.
if (error) {
RM_PRINTF("ERROR: timer_callback error: %s\n", error.message().c_str());
return;
}

// If setup has not been performed yet, then we cannot perform scheduling.
while (!setup) {
if (!setup) {
// RM_PRINTF("WARNING setup not completed, skipping scheduling\n");
usleep(10000);
timer.expires_from_now(one_sec);
timer.async_wait(&timer_callback);
return;
}
// At this point, max_active_flows, epoch_us, and flow_to_rwnd_fd should be.
// At this point, max_active_flows, epoch_us, and flow_to_rwnd_fd should be
// set.
if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0) {
lock_control.unlock();
RM_PRINTF(
"ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, or "
"flow_to_rwnd_fd=%d\n",
max_active_flows, epoch_us, flow_to_rwnd_fd);
timer.expires_from_now(one_sec);
timer.async_wait(&timer_callback);
return;
}

// Previously paused flows that will be activated.
std::vector<int> new_active_fds;
// Preallocate suffient space.
new_active_fds.reserve(max_active_flows);
lock_scheduler.lock();
RM_PRINTF("Performing scheduling\n");

while (true) {
usleep(epoch_us);
if (active_fds_queue.empty()) {
timer.expires_from_now(one_sec);
timer.async_wait(&timer_callback);
lock_scheduler.unlock();
return;
}

// If fewer than the max number of flows exist and they are all active, then
// there is no need for scheduling.
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.size() == 0) {
// RM_PRINTF("WARNING insufficient flows, skipping scheduling\n");
continue;
boost::posix_time::ptime now_plus_epoch =
boost::posix_time::microsec_clock::local_time() +
boost::posix_time::microseconds(epoch_us);

// if (boost::posix_time::microsec_clock::local_time() <
// active_fds_queue.front().second) {
// // The next flow should not be paused yet.
// timer.expires_at(active_fds_queue.front().second);
// timer.async_wait(&timer_callback);
// lock_scheduler.unlock();
// return;
// }

// If there are paused flows, then unpause one and remember that we did that.
bool found_paused = false;
while (!paused_fds_queue.empty()) {
// Activate the next flow. Record the time at which is should be paused.
int to_activate = paused_fds_queue.front();
paused_fds_queue.pop();
if (fd_to_flow.contains(to_activate)) {
active_fds_queue.push({to_activate, now_plus_epoch});
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate]));
trigger_ack(to_activate);
found_paused = true;
break;
}
}

RM_PRINTF("Performing scheduling\n");
new_active_fds.clear();
lock_scheduler.lock();

// Try to find max_active_flows FDs to unpause.
while (!paused_fds_queue.empty() and
new_active_fds.size() < max_active_flows) {
// If we still know about this flow, then we can activate it.
int next_fd = paused_fds_queue.front();
if (fd_to_flow.contains(next_fd)) {
paused_fds_queue.pop();
new_active_fds.push_back(next_fd);
}
}
auto num_prev_active = active_fds_queue.size();

// For each of the flows chosen to be activated, add it to the active set
// and remove it from the RWND map. Trigger an ACK to wake it up. Note that
// twice the allowable number of flows will be active briefly.
RM_PRINTF("Activating %lu flows: ", new_active_fds.size());
for (const auto &fd : new_active_fds) {
RM_PRINTF("%d ", fd);
active_fds_queue.push(fd);
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[fd]));
trigger_ack(fd);
}
RM_PRINTF("\n");

// For each fo the previously active flows, add it to the paused set,
// install an RWND mapping to actually pause it, and trigger an ACK to
// communicate the new RWND value.
RM_PRINTF("Pausing %lu flows: ", num_prev_active);
for (unsigned long i = 0; i < num_prev_active; i++) {
int fd = active_fds_queue.front();
active_fds_queue.pop();
RM_PRINTF("%d ", fd);
paused_fds_queue.push(fd);
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[fd]), &zero, BPF_ANY);
// TODO: Do we need to send an ACK to immediately pause the flow?
trigger_ack(fd);
// Pause the next active flow.
int to_pause = active_fds_queue.front().first;
active_fds_queue.pop();
if (found_paused) {
// If there were other flows waiting and we activated one, then pause this
// flow.
paused_fds_queue.push(to_pause);
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero,
BPF_ANY);
trigger_ack(to_pause);
} else {
// If there were no waiting flows, then put this flow back in the active
// queue.
if (fd_to_flow.contains(to_pause)) {
active_fds_queue.push({to_pause, now_plus_epoch});
}
RM_PRINTF("\n");

lock_scheduler.unlock();
new_active_fds.clear();
}
lock_scheduler.unlock();

// Schedule the next timer callback for when the next flow should be paused.
timer.expires_at(active_fds_queue.front().second);
timer.async_wait(&timer_callback);
return;
}

boost::thread t(thread_func);
void thread_func() {
timer.expires_from_now(one_sec);
timer.async_wait(&timer_callback);
io.run();
}

boost::thread thread(thread_func);

bool read_env_uint(const char *key, volatile unsigned int *dest) {
char *val_str = getenv(key);
Expand Down Expand Up @@ -178,15 +199,18 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
lock_control.lock();
if (!setup) {
if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) {
lock_control.unlock();
return new_fd;
}
if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) {
lock_control.unlock();
return new_fd;
}

int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH);
if (err == -1) {
RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd'\n");
lock_control.unlock();
return new_fd;
}
flow_to_rwnd_fd = err;
Expand All @@ -198,18 +222,6 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
}
lock_control.unlock();

// Hack for iperf3. The first flow is a control flow that should not be
// scheduled. Note that for this hack to work, libratemon_interp must be
// restarted between tests.
lock_control.lock();
if (fd_to_flow.size() == 0 && !skipped_first) {
RM_PRINTF("WARNING: skipping first flow\n");
skipped_first = true;
lock_control.unlock();
return new_fd;
}
lock_control.unlock();

// Set the CCA and make sure it was set correctly.
if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC,
strlen(RM_BPF_CUBIC)) == -1) {
Expand Down Expand Up @@ -252,22 +264,28 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
.remote_addr = ntohl(remote_addr.sin_addr.s_addr),
.local_port = ntohs(local_addr.sin_port),
.remote_port = ntohs(remote_addr.sin_port)};
// RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port,
// flow.local_addr, flow.local_port);
RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port,
flow.local_addr, flow.local_port);

lock_scheduler.lock();
fd_to_flow[new_fd] = flow;
// Should this flow be active or paused?
if (active_fds_queue.size() < max_active_flows) {
// Less than the max number of flows are active, so make this one active.
active_fds_queue.push(new_fd);
active_fds_queue.push(
{new_fd, boost::posix_time::microsec_clock::local_time() +
boost::posix_time::microseconds(epoch_us)});
if (active_fds_queue.size() == 1) {
timer.cancel();
timer.expires_at(active_fds_queue.front().second);
timer.async_wait(&timer_callback);
}
} else {
// The max number of flows are active already, so pause this one.
paused_fds_queue.push(new_fd);
// Pausing a flow means retting its RWND to 0 B.
bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY);
}

lock_scheduler.unlock();

RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd);
Expand All @@ -283,10 +301,6 @@ int close(int sockfd) {
return -1;
}
int ret = real_close(sockfd);
if (ret == -1) {
RM_PRINTF("ERROR: real 'close' failed\n");
return ret;
}

// To get the flow struct for this FD, we must use visit() to look it up
// in the concurrent_flat_map. Obviously, do this before removing the FD
Expand All @@ -296,6 +310,11 @@ int close(int sockfd) {
// scheduling.
fd_to_flow.erase(sockfd);

if (ret == -1) {
RM_PRINTF("ERROR: real 'close' failed\n");
return ret;
}

RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd);
return ret;
}
Expand Down

0 comments on commit 5d78c15

Please sign in to comment.