Skip to content

Commit

Permalink
Fix signal handler and thread management
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 6, 2024
1 parent 851e1aa commit 3ae6d8d
Showing 1 changed file with 59 additions and 11 deletions.
70 changes: 59 additions & 11 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <linux/inet_diag.h>
#include <netinet/in.h> // structure for storing address information
#include <netinet/tcp.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -27,31 +28,45 @@

#include "ratemon.h"

boost::asio::io_service io;
boost::asio::deadline_timer timer(io);

// Protects writes only to max_active_flows, epoch_us, num_to_schedule,
// monitor_port, flow_to_rwnd_fd, flow_to_win_scale_fd, oldact, and setup. Reads
// are unprotected.
std::mutex lock_setup;
unsigned int max_active_flows = 5;
unsigned int epoch_us = 10000;
unsigned int num_to_schedule = 1;
unsigned short monitor_port = 9000;
// Whether setup has been performed.
bool setup = false;
// Used to signal the scheduler thread to end.
bool run = true;
// Existing signal handler for SIGINT.
struct sigaction oldact;
// FD for the BPF map "flow_to_rwnd".
int flow_to_rwnd_fd = 0;
// FD for the BPF map "flow_to_win_sca" (short for "flow_to_win_scale").
int flow_to_win_scale_fd = 0;

// Protects active_fds_queue and paused_fds_queue.
// Manages the io_service.
boost::thread scheduler_thread;
// Runs async timers for scheduling
boost::asio::io_service io;
// Periodically performs scheduling using timer_callback().
boost::asio::deadline_timer timer(io);
// Protects writes and reads to active_fds_queue, paused_fds_queue, and
// fd_to_flow.
std::mutex lock_scheduler;
// FDs for flows thare are currently active.
std::queue<std::pair<int, boost::posix_time::ptime>> active_fds_queue;
// FDs for flows that are currently paused (RWND = 0 B);
std::queue<int> paused_fds_queue;
// Maps file descriptor to rm_flow struct.
std::unordered_map<int, struct rm_flow> fd_to_flow;
// The next four are scheduled RWND tuning parameters. See ratemon.h for
// parameter documentation.
unsigned int max_active_flows = 5;
unsigned int epoch_us = 10000;
unsigned int num_to_schedule = 1;
unsigned short monitor_port = 9000;

// Used to set entries in flow_to_rwnd.
int zero = 0;
boost::posix_time::seconds one_sec = boost::posix_time::seconds(1);

// As an optimization, reuse the same tcp_cc_info struct and size.
union tcp_cc_info placeholder_cc_info;
socklen_t placeholder_cc_info_length = (socklen_t)sizeof(placeholder_cc_info);
Expand All @@ -73,6 +88,11 @@ void timer_callback(const boost::system::error_code &error) {
return;
}

if (!run) {
RM_PRINTF("INFO: exiting...\n");
return;
}

// If setup has not been performed yet, then we cannot perform scheduling.
if (!setup) {
// RM_PRINTF("WARNING setup not completed, skipping scheduling\n");
Expand Down Expand Up @@ -200,13 +220,31 @@ void timer_callback(const boost::system::error_code &error) {
void thread_func() {
// This function is designed to be run in a thread. It is responsible for
// managing the async timers that perform scheduling.
RM_PRINTF("Scheduler thread started\n");
timer.expires_from_now(one_sec);
timer.async_wait(&timer_callback);
// Execute the configured events, until there are no more events to execute.
io.run();
RM_PRINTF("Scheduler thread ended\n");
}

boost::thread thread(thread_func);
// Catch SIGINT and trigger the main function to end.
void sigint_handler(int signum) {
switch (signum) {
case SIGINT:
RM_PRINTF("INFO: caught SIGINT\n");
run = false;
scheduler_thread.join();
RM_PRINTF("Resetting old SIGINT handler\n");
sigaction(SIGINT, &oldact, NULL);
break;
default:
RM_PRINTF("ERROR: caught signal %d\n", signum);
break;
}
RM_PRINTF("INFO: re-raising signal %d\n", signum);
raise(signum);
}

bool read_env_uint(const char *key, volatile unsigned int *dest) {
// Read an environment variable as an unsigned integer.
Expand Down Expand Up @@ -299,6 +337,16 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
flow_to_win_scale_fd = err;
RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n");

// Catch SIGINT to end the program.
struct sigaction action;
action.sa_handler = sigint_handler;
sigemptyset(&action.sa_mask);
action.sa_flags = SA_RESETHAND;
sigaction(SIGINT, &action, &oldact);

// Launch the scheduler thread.
scheduler_thread = boost::thread(thread_func);

RM_PRINTF(
"INFO: setup complete! max_active_flows=%u, epoch_us=%u, "
"num_to_schedule=%u, monitor_port=%u\n",
Expand Down

0 comments on commit 3ae6d8d

Please sign in to comment.