Skip to content

Commit

Permalink
Clean up flow_to_win_scale as well
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 6, 2024
1 parent 0f70a19 commit 851e1aa
Showing 1 changed file with 55 additions and 22 deletions.
77 changes: 55 additions & 22 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ unsigned int epoch_us = 10000;
unsigned int num_to_schedule = 1;
unsigned short monitor_port = 9000;
bool setup = false;
// FD for the flow_to_rwnd map.
// FD for the BPF map "flow_to_rwnd".
int flow_to_rwnd_fd = 0;
// FD for the BPF map "flow_to_win_sca" (short for "flow_to_win_scale").
int flow_to_win_scale_fd = 0;

// Protects active_fds_queue and paused_fds_queue.
std::mutex lock_scheduler;
Expand Down Expand Up @@ -139,6 +141,7 @@ void timer_callback(const boost::system::error_code &error) {
active_fds_queue.push({to_activate, now_plus_epoch});
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate]));
trigger_ack(to_activate);
RM_PRINTF("INFO: activated FD=%d\n", to_activate);
break;
}
}
Expand All @@ -151,20 +154,24 @@ void timer_callback(const boost::system::error_code &error) {
// either schedule that flow again or pause it.
int to_pause = active_fds_queue.front().first;
active_fds_queue.pop();
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.empty()) {
// If there are fewer than the limit flows active and there are no
// waiting flows, then scheduling this flow again. But first, check to
// make sure that it is still valid (see above).
if (fd_to_flow.contains(to_pause)) {
if (fd_to_flow.contains(to_pause)) {
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.empty()) {
// If there are fewer than the limit flows active and there are no
// waiting flows, then scheduling this flow again. But first, check to
// make sure that it is still valid (see above).
active_fds_queue.push({to_pause, now_plus_epoch});
RM_PRINTF("INFO: reactivated FD=%d\n", to_pause);
} else {
// Pause this flow.
paused_fds_queue.push(to_pause);
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero,
BPF_ANY);
trigger_ack(to_pause);
RM_PRINTF("INFO: paused FD=%d\n", to_pause);
}
} else {
// Pause this flow.
paused_fds_queue.push(to_pause);
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero,
BPF_ANY);
trigger_ack(to_pause);
RM_PRINTF("INFO: cleaned up FD=%d\n", to_pause);
}
}
}
Expand Down Expand Up @@ -271,18 +278,32 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
// for this.
int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH);
if (err == -1) {
RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd'\n");
RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd' from path '%s'\n",
RM_FLOW_TO_RWND_PIN_PATH);
lock_setup.unlock();
return new_fd;
}
flow_to_rwnd_fd = err;
RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n");
setup = true;

// Look up the FD for the flow_to_win_scale map. We do not need the BPF
// skeleton for this.
err = bpf_obj_get(RM_FLOW_TO_WIN_SCALE_PIN_PATH);
if (err == -1) {
RM_PRINTF(
"ERROR: failed to get FD for 'flow_to_win_scale' from path '%s'\n",
RM_FLOW_TO_WIN_SCALE_PIN_PATH);
lock_setup.unlock();
return new_fd;
}
flow_to_win_scale_fd = err;
RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n");

RM_PRINTF(
"INFO: setup complete! max_active_flows=%u, epoch_us=%u, "
"num_to_schedule=%u, monitor_port=%u\n",
max_active_flows, epoch_us, num_to_schedule, monitor_port);
setup = true;
}
lock_setup.unlock();

Expand Down Expand Up @@ -345,6 +366,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
active_fds_queue.push(
{new_fd, boost::posix_time::microsec_clock::local_time() +
boost::posix_time::microseconds(epoch_us)});
RM_PRINTF("INFO: allowing new flow FD=%d\n", new_fd);
if (active_fds_queue.size() == 1) {
timer.cancel();
timer.expires_at(active_fds_queue.front().second);
Expand All @@ -355,6 +377,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
paused_fds_queue.push(new_fd);
// Pausing a flow means retting its RWND to 0 B.
bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY);
RM_PRINTF("INFO: paused new flow FD=%d\n", new_fd);
}
lock_scheduler.unlock();

Expand All @@ -372,19 +395,29 @@ int close(int sockfd) {
}
int ret = real_close(sockfd);

// To get the flow struct for this FD, we must use visit() to look it up
// in the concurrent_flat_map. Obviously, do this before removing the FD
// from fd_to_flow.
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd]));
// Removing the FD from fd_to_flow triggers it to be (eventually) removed from
// scheduling.
fd_to_flow.erase(sockfd);
// Remove this FD from all data structures.
lock_scheduler.lock();
if (fd_to_flow.contains(sockfd)) {
// Obviously, do this before removing the FD from fd_to_flow.
if (flow_to_rwnd_fd != 0) {
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd]));
}
if (flow_to_win_scale_fd != 0) {
bpf_map_delete_elem(flow_to_win_scale_fd, &(fd_to_flow[sockfd]));
}
// Removing the FD from fd_to_flow triggers it to be (eventually) removed
// from scheduling.
fd_to_flow.erase(sockfd);
RM_PRINTF("INFO: removed FD=%d\n", sockfd);
} else {
RM_PRINTF("INFO: ignoring 'close' for FD=%d, not in fd_to_flow\n", sockfd);
}
lock_scheduler.unlock();

if (ret == -1) {
RM_PRINTF("ERROR: real 'close' failed\n");
return ret;
}

RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd);
return ret;
}
Expand Down

0 comments on commit 851e1aa

Please sign in to comment.