From 3ae6d8dbc925ecd31d90972e9d55a39ccfbbc71e Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Mon, 6 May 2024 20:16:58 +0000 Subject: [PATCH] Fix signal handler and thread management --- ratemon/runtime/c/libratemon_interp.cpp | 70 +++++++++++++++++++++---- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index a0be380..e07e0f1 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -9,6 +9,7 @@ #include #include // structure for storing address information #include +#include #include #include #include @@ -27,31 +28,45 @@ #include "ratemon.h" -boost::asio::io_service io; -boost::asio::deadline_timer timer(io); - +// Protects writes only to max_active_flows, epoch_us, num_to_schedule, +// monitor_port, flow_to_rwnd_fd, flow_to_win_scale_fd, oldact, and setup. Reads +// are unprotected. std::mutex lock_setup; -unsigned int max_active_flows = 5; -unsigned int epoch_us = 10000; -unsigned int num_to_schedule = 1; -unsigned short monitor_port = 9000; +// Whether setup has been performed. bool setup = false; +// Used to signal the scheduler thread to end. +bool run = true; +// Existing signal handler for SIGINT. +struct sigaction oldact; // FD for the BPF map "flow_to_rwnd". int flow_to_rwnd_fd = 0; // FD for the BPF map "flow_to_win_sca" (short for "flow_to_win_scale"). int flow_to_win_scale_fd = 0; - -// Protects active_fds_queue and paused_fds_queue. +// Manages the io_service. +boost::thread scheduler_thread; +// Runs async timers for scheduling +boost::asio::io_service io; +// Periodically performs scheduling using timer_callback(). +boost::asio::deadline_timer timer(io); +// Protects writes and reads to active_fds_queue, paused_fds_queue, and +// fd_to_flow. std::mutex lock_scheduler; +// FDs for flows thare are currently active. std::queue> active_fds_queue; +// FDs for flows that are currently paused (RWND = 0 B); std::queue paused_fds_queue; // Maps file descriptor to rm_flow struct. std::unordered_map fd_to_flow; +// The next four are scheduled RWND tuning parameters. See ratemon.h for +// parameter documentation. +unsigned int max_active_flows = 5; +unsigned int epoch_us = 10000; +unsigned int num_to_schedule = 1; +unsigned short monitor_port = 9000; // Used to set entries in flow_to_rwnd. int zero = 0; boost::posix_time::seconds one_sec = boost::posix_time::seconds(1); - // As an optimization, reuse the same tcp_cc_info struct and size. union tcp_cc_info placeholder_cc_info; socklen_t placeholder_cc_info_length = (socklen_t)sizeof(placeholder_cc_info); @@ -73,6 +88,11 @@ void timer_callback(const boost::system::error_code &error) { return; } + if (!run) { + RM_PRINTF("INFO: exiting...\n"); + return; + } + // If setup has not been performed yet, then we cannot perform scheduling. if (!setup) { // RM_PRINTF("WARNING setup not completed, skipping scheduling\n"); @@ -200,13 +220,31 @@ void timer_callback(const boost::system::error_code &error) { void thread_func() { // This function is designed to be run in a thread. It is responsible for // managing the async timers that perform scheduling. + RM_PRINTF("Scheduler thread started\n"); timer.expires_from_now(one_sec); timer.async_wait(&timer_callback); // Execute the configured events, until there are no more events to execute. io.run(); + RM_PRINTF("Scheduler thread ended\n"); } -boost::thread thread(thread_func); +// Catch SIGINT and trigger the main function to end. +void sigint_handler(int signum) { + switch (signum) { + case SIGINT: + RM_PRINTF("INFO: caught SIGINT\n"); + run = false; + scheduler_thread.join(); + RM_PRINTF("Resetting old SIGINT handler\n"); + sigaction(SIGINT, &oldact, NULL); + break; + default: + RM_PRINTF("ERROR: caught signal %d\n", signum); + break; + } + RM_PRINTF("INFO: re-raising signal %d\n", signum); + raise(signum); +} bool read_env_uint(const char *key, volatile unsigned int *dest) { // Read an environment variable as an unsigned integer. @@ -299,6 +337,16 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { flow_to_win_scale_fd = err; RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n"); + // Catch SIGINT to end the program. + struct sigaction action; + action.sa_handler = sigint_handler; + sigemptyset(&action.sa_mask); + action.sa_flags = SA_RESETHAND; + sigaction(SIGINT, &action, &oldact); + + // Launch the scheduler thread. + scheduler_thread = boost::thread(thread_func); + RM_PRINTF( "INFO: setup complete! max_active_flows=%u, epoch_us=%u, " "num_to_schedule=%u, monitor_port=%u\n",