Skip to content

Commit

Permalink
Cleanup; Move eBPF loading to separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Mar 1, 2024
1 parent 8b1aa45 commit 8eb3694
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 111 deletions.
107 changes: 107 additions & 0 deletions ratemon/runtime/ebpf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import logging
import os
from os import path
import time

from bcc import BPF, BPFAttachType
import numpy as np
from pyroute2 import IPRoute, protocols
from pyroute2.netlink.exceptions import NetlinkError


def load_ebpf():
"""Load the corresponding eBPF program."""
# Load BPF text.
bpf_flp = path.join(
path.abspath(path.dirname(__file__)),
"ratemon_runtime.c",
)
if not path.isfile(bpf_flp):
logging.error("Could not find BPF program: %s", bpf_flp)
return 1
logging.info("Loading BPF program: %s", bpf_flp)
with open(bpf_flp, "r", encoding="utf-8") as fil:
bpf_text = fil.read()
# Load BPF program.
return BPF(text=bpf_text)


def configure_ebpf(args):
"""Set up eBPF hooks."""
if min(args.listen_ports) >= 50000:
# Use the listen ports to determine the wait time, so that multiple
# instances of this program do not try to configure themselves at the same
# time.
rand_sleep = min(args.listen_ports) - 50000
logging.info("Waiting %f seconds to prevent race conditions...", rand_sleep)
time.sleep(rand_sleep)

try:
bpf = load_ebpf()
except:
logging.exception("Error loading BPF program!")
return None, None
flow_to_rwnd = bpf["flow_to_rwnd"]

# Set up a TC egress qdisc, specify a filter the accepts all packets, and attach
# our egress function as the action on that filter.
ipr = IPRoute()
ifindex = ipr.link_lookup(ifname=args.interface)
assert (
len(ifindex) == 1
), f'Trouble looking up index for interface "{args.interface}": {ifindex}'
ifindex = ifindex[0]

logging.info("Attempting to create central qdisc")
handle = 0x10000
default = 0x200000
responsible_for_central_tc = False
try:
ipr.tc("add", "htb", ifindex, handle, default=default)
except NetlinkError:
logging.warning("Unable to create central qdisc. It probably already exists.")
else:
logging.info("Responsible for central TC")
responsible_for_central_tc = True

if not responsible_for_central_tc:
# If someone else is responsible for the egress action, then we will just let
# them do the work.
logging.warning("Not configuring TC")
return flow_to_rwnd, None

# Read the TCP window scale on outgoing SYN-ACK packets.
func_sock_ops = bpf.load_func("read_win_scale", bpf.SOCK_OPS) # sock_stuff
filedesc = os.open(args.cgroup, os.O_RDONLY)
bpf.attach_func(func_sock_ops, filedesc, BPFAttachType.CGROUP_SOCK_OPS)

# Overwrite advertised window size in outgoing packets.
egress_fn = bpf.load_func("do_rwnd_at_egress", BPF.SCHED_ACT)
action = dict(kind="bpf", fd=egress_fn.fd, name=egress_fn.name, action="ok")

try:
# Add the action to a u32 match-all filter
ipr.tc(
"add-filter",
"u32",
ifindex,
parent=handle,
prio=10,
protocol=protocols.ETH_P_ALL, # Every packet
target=0x10020,
keys=["0x0/0x0+0"],
action=action,
)
except:
logging.exception("Error: Unable to configure TC.")
return None, None

def ebpf_cleanup():
"""Clean attached eBPF programs."""
logging.info("Detaching sock_ops hook...")
bpf.detach_func(func_sock_ops, filedesc, BPFAttachType.CGROUP_SOCK_OPS)
logging.info("Removing egress TC...")
ipr.tc("del", "htb", ifindex, handle, default=default)

logging.info("Configured TC and BPF!")
return flow_to_rwnd, ebpf_cleanup
111 changes: 3 additions & 108 deletions ratemon/runtime/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@
import collections
import ctypes
import logging
import os
from os import path
import queue
import signal
import time
import traceback

from bcc import BPF, BPFAttachType
import numpy as np
from pyroute2 import IPRoute, protocols
from pyroute2.netlink.exceptions import NetlinkError

from ratemon.model import data, defaults, features, gen_features, models, utils
from ratemon.runtime import flow_utils, reaction_strategy
from ratemon.runtime import ebpf, flow_utils, reaction_strategy
from ratemon.runtime.reaction_strategy import ReactionStrategy


Expand Down Expand Up @@ -126,9 +121,7 @@ def make_decision_servicepolicy(
]

lr = fets[-1][
features.make_win_metric(
features.LOSS_RATE_FET, models.MathisFairness.win_size
)
features.make_win_metric(features.LOSS_RATE_FET, models.MathisFairness.win_size)
]

avg_rtt_us = fets[-1][
Expand Down Expand Up @@ -412,104 +405,6 @@ def packets_to_ndarray(pkts, dtype, packets_lost, win_to_loss_event_rate):
return fets


def load_bpf():
"""Load the corresponding eBPF program."""
# Load BPF text.
bpf_flp = path.join(
path.abspath(path.dirname(__file__)),
"ratemon_runtime.c",
)
if not path.isfile(bpf_flp):
logging.error("Could not find BPF program: %s", bpf_flp)
return 1
logging.info("Loading BPF program: %s", bpf_flp)
with open(bpf_flp, "r", encoding="utf-8") as fil:
bpf_text = fil.read()
# Load BPF program.
return BPF(text=bpf_text)


def configure_ebpf(args):
"""Set up eBPF hooks."""
if min(args.listen_ports) >= 50000:
# Use the listen ports to determine the wait time, so that multiple
# instances of this program do not try to configure themselves at the same
# time.
rand_sleep = min(args.listen_ports) - 50000
logging.info("Waiting %f seconds to prevent race conditions...", rand_sleep)
time.sleep(rand_sleep)

try:
bpf = load_bpf()
except:
logging.exception("Error loading BPF program!")
return None, None
flow_to_rwnd = bpf["flow_to_rwnd"]

# Set up a TC egress qdisc, specify a filter the accepts all packets, and attach
# our egress function as the action on that filter.
ipr = IPRoute()
ifindex = ipr.link_lookup(ifname=args.interface)
assert (
len(ifindex) == 1
), f'Trouble looking up index for interface "{args.interface}": {ifindex}'
ifindex = ifindex[0]

logging.info("Attempting to create central qdisc")
handle = 0x10000
default = 0x200000
responsible_for_central_tc = False
try:
ipr.tc("add", "htb", ifindex, handle, default=default)
except NetlinkError:
logging.warning("Unable to create central qdisc. It probably already exists.")
else:
logging.info("Responsible for central TC")
responsible_for_central_tc = True

if not responsible_for_central_tc:
# If someone else is responsible for the egress action, then we will just let
# them do the work.
logging.warning("Not configuring TC")
return flow_to_rwnd, None

# Read the TCP window scale on outgoing SYN-ACK packets.
func_sock_ops = bpf.load_func("read_win_scale", bpf.SOCK_OPS) # sock_stuff
filedesc = os.open(args.cgroup, os.O_RDONLY)
bpf.attach_func(func_sock_ops, filedesc, BPFAttachType.CGROUP_SOCK_OPS)

# Overwrite advertised window size in outgoing packets.
egress_fn = bpf.load_func("handle_egress", BPF.SCHED_ACT)
action = dict(kind="bpf", fd=egress_fn.fd, name=egress_fn.name, action="ok")

try:
# Add the action to a u32 match-all filter
ipr.tc(
"add-filter",
"u32",
ifindex,
parent=handle,
prio=10,
protocol=protocols.ETH_P_ALL, # Every packet
target=0x10020,
keys=["0x0/0x0+0"],
action=action,
)
except:
logging.exception("Error: Unable to configure TC.")
return None, None

def ebpf_cleanup():
"""Clean attached eBPF programs."""
logging.info("Detaching sock_ops hook...")
bpf.detach_func(func_sock_ops, filedesc, BPFAttachType.CGROUP_SOCK_OPS)
logging.info("Removing egress TC...")
ipr.tc("del", "htb", ifindex, handle, default=default)

logging.info("Configured TC and BPF!")
return flow_to_rwnd, ebpf_cleanup


def parse_from_inference_queue(
val, flow_to_rwnd, flow_to_decisions, flow_to_prev_features
):
Expand Down Expand Up @@ -1173,7 +1068,7 @@ def signal_handler(sig, frame):

cleanup = None
try:
flow_to_rwnd, cleanup = configure_ebpf(args)
flow_to_rwnd, cleanup = ebpf.configure_ebpf(args)
if flow_to_rwnd is None:
return
inference_loop(args, flow_to_rwnd, que, inference_flags, done)
Expand Down
4 changes: 2 additions & 2 deletions ratemon/runtime/ratemon_runtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ BPF_TABLE_PINNED("hash", struct flow_t, u8, flow_to_win_scale, 1024, "/sys/fs/bp
// BPF_TABLE_PINNED(_table_type, _key_type, _leaf_type, _name, _max_entries, "/sys/fs/bpf/xyz");

// Inspired by: https://stackoverflow.com/questions/65762365/ebpf-printing-udp-payload-and-source-ip-as-hex
int handle_egress(struct __sk_buff *skb)
int do_rwnd_at_egress(struct __sk_buff *skb)
{
if (skb == NULL)
{
Expand Down Expand Up @@ -186,7 +186,7 @@ static inline int handle_write_hdr_opt(struct bpf_sock_ops *skops)
// This is not an IPv4 packet. We only support IPv4 packets because the struct
// we use as a map key stores IP addresses as 32 bits. This is purely an
// implementation detail.
bpf_trace_printk("Warning: Not using IPv4 for flow on local port %u\n", skops->local_port);
bpf_trace_printk("Warning: Not using IPv4 for flow on local port %u: family=%u\n", skops->local_port, skops->family);
return SOCKOPS_OK;
}

Expand Down
7 changes: 6 additions & 1 deletion ratemon/runtime/ratemon_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
import pcapy

from ratemon.model import features, models, utils
from ratemon.runtime import flow_utils, inference, mitigation_strategy, reaction_strategy
from ratemon.runtime import (
flow_utils,
inference,
mitigation_strategy,
reaction_strategy,
)
from ratemon.runtime.mitigation_strategy import MitigationStrategy
from ratemon.runtime.reaction_strategy import ReactionStrategy

Expand Down

0 comments on commit 8eb3694

Please sign in to comment.