Skip to content

Commit

Permalink
Debugging idle timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Jun 6, 2024
1 parent 2179295 commit e7c6394
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 21 deletions.
65 changes: 47 additions & 18 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ std::unordered_map<int, struct rm_flow> fd_to_flow;
// parameter documentation.
unsigned int max_active_flows = 5;
unsigned int epoch_us = 10000;
unsigned long idle_timeout_ns = 1000;
long idle_timeout_us = 0;
unsigned long idle_timeout_ns = 0;
unsigned short monitor_port_start = 9000;
unsigned short monitor_port_end = 9999;

Expand Down Expand Up @@ -201,17 +202,35 @@ void timer_callback(const boost::system::error_code &error) {
// Look up this flow's last active time.
if (!bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first],
&last_data_time_ns)) {
idle_ns = ktime_now_ns - last_data_time_ns;
RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first,
idle_ns);
// If the flow has been idle for longer than the idle timeout, then
// pause it. We pause the flow *before* activating a replacement flow
// because it is by definition not sending data, so we do not risk
// causing a drop in utilization by pausing it immediately.
if (idle_ns > idle_timeout_ns) {
paused_fds_queue.push(a.first);
pause_flow(a.first);
continue;
// If last_data_time_ns is 0, then this flow has not yet been tracked.
if (last_data_time_ns) {
if (last_data_time_ns > ktime_now_ns) {
// This could be fine...perhaps a packet arrived since we captured
// the current time above?
RM_PRINTF(
"WARNING: FD=%d last data time (%lu ns) is more recent that "
"current time (%lu ns) by %lu ns\n",
a.first, last_data_time_ns, ktime_now_ns,
last_data_time_ns - ktime_now_ns);
} else {
idle_ns = ktime_now_ns - last_data_time_ns;
RM_PRINTF("INFO: FD=%d now: %lu ns, last data time: %lu ns\n",
a.first, ktime_now_ns, last_data_time_ns);
RM_PRINTF(
"INFO: FD=%d idle has been idle for %lu ns. timeout is %lu "
"ns\n",
a.first, idle_ns, idle_timeout_ns);
// If the flow has been idle for longer than the idle timeout, then
// pause it. We pause the flow *before* activating a replacement
// flow because it is by definition not sending data, so we do not
// risk causing a drop in utilization by pausing it immediately.
if (idle_ns >= idle_timeout_ns) {
RM_PRINTF("INFO: Pausing FD=%d due to idle timeout\n", a.first);
paused_fds_queue.push(a.first);
pause_flow(a.first);
continue;
}
}
}
}
}
Expand Down Expand Up @@ -304,10 +323,15 @@ void timer_callback(const boost::system::error_code &error) {
RM_PRINTF("INFO: scheduling timer for next epoch end\n");
} else {
// If we are using idle timeout mode...
when = boost::posix_time::microsec(
std::min((long)idle_timeout_ns,
(active_fds_queue.front().second - now).total_microseconds()));
RM_PRINTF("INFO: scheduling timer for next epoch end or idle timeout\n");
long next_epoch_us =
(active_fds_queue.front().second - now).total_microseconds();
if (idle_timeout_us < next_epoch_us) {
RM_PRINTF("INFO: scheduling timer for next idle timeout\n");
when = boost::posix_time::microsec(idle_timeout_us);
} else {
RM_PRINTF("INFO: scheduling timer for next epoch end\n");
when = boost::posix_time::microsec(next_epoch_us);
}
}

// 6) Start the next timer.
Expand Down Expand Up @@ -400,10 +424,11 @@ bool setup() {
// Read environment variables with parameters.
if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) return false;
if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) return false;
unsigned int idle_timeout_us;
if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us,
unsigned int idle_timeout_us_;
if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us_,
true /* allow_zero */))
return false;
idle_timeout_us = (long)idle_timeout_us_;
idle_timeout_ns = (unsigned long)idle_timeout_us * 1000UL;
unsigned int monitor_port_start_;
if (!read_env_uint(RM_MONITOR_PORT_START_KEY, &monitor_port_start_) ||
Expand Down Expand Up @@ -517,6 +542,10 @@ bool set_cca(int fd, const char *cca) {

// Perform initial scheduling for this flow.
void initial_scheduling(int fd) {
// Create an entry in flow_to_last_data_time_ns for this flow so that the
// kprobe program knows to start tracking this flow.
bpf_map_update_elem(flow_to_last_data_time_fd, &fd_to_flow[fd], &zero,
BPF_ANY);
// Should this flow be active or paused?
if (active_fds_queue.size() < max_active_flows) {
// Less than the max number of flows are active, so make this one active.
Expand Down
2 changes: 1 addition & 1 deletion ratemon/runtime/c/ratemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#define __RATEMON_H

// Comment out the below line to disable verbose logging.
// #define RM_VERBOSE
#define RM_VERBOSE

#ifdef RM_VERBOSE
#define RM_PRINTF(...) printf(__VA_ARGS__)
Expand Down
13 changes: 11 additions & 2 deletions ratemon/runtime/c/ratemon_kprobe.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,17 @@ int BPF_KPROBE(tcp_rcv_established, struct sock *sk, struct sk_buff *skb) {
.remote_addr = bpf_ntohl(skc_daddr),
.local_port = skc_num,
.remote_port = bpf_ntohs(skc_dport)};
// Check if we should record the last data time for this flow.
if (bpf_map_lookup_elem(&flow_to_last_data_time_ns, &flow) == NULL) {
// This flow is not in the map, so we are not supposed to track its last
// data time.
return 0;
}
// Get the current time and store it for this flow.
unsigned int now_ns = bpf_ktime_get_ns();
bpf_map_update_elem(&flow_to_last_data_time_ns, &flow, &now_ns, BPF_ANY);
unsigned long now_ns = bpf_ktime_get_ns();
if (bpf_map_update_elem(&flow_to_last_data_time_ns, &flow, &now_ns,
BPF_ANY)) {
bpf_printk("ERROR: 'tcp_rcv_established' error updating last data time");
}
return 0;
}

0 comments on commit e7c6394

Please sign in to comment.