Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cpu optimizations #70

Merged
merged 2 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions ebpf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,19 @@ func run() {
var isRunning_2 bool
var mu_2 = &sync.Mutex{}

pollInterval := 5 * time.Minute
pollInterval := 20 * time.Minute

trafficUtils.InitVar("UPROBE_POLL_INTERVAL", &pollInterval)

ssl.InitMaps(bpfModule)

if captureSsl == "true" || captureAll == "true" {
go func() {
for {
fmt.Printf("Starting to attach to processes in ticker start\n")
ticker := time.NewTicker(pollInterval) // Create a ticker to trigger every minute
defer ticker.Stop()
for range ticker.C {
fmt.Printf("Starting to attach to processes in ticker\n")
if !isRunning_2 {
mu_2.Lock()
if isRunning_2 {
Expand All @@ -177,8 +181,9 @@ func run() {
isRunning_2 = false
mu_2.Unlock()
}
time.Sleep(pollInterval)
fmt.Printf("Ended attaching to processes in ticker\n")
}
fmt.Printf("Ended attaching to processes in ticker end\n")
}()
}

Expand All @@ -194,6 +199,13 @@ func run() {
}
}

//ticker := time.NewTicker(15 * time.Second)
//defer ticker.Stop()
//
//for range ticker.C {
// go captureCpuProfile()
//}

sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
log.Println("Sniffer is ready")
Expand All @@ -207,3 +219,24 @@ func captureMemoryProfile() {

pprof.WriteHeapProfile(f) // Write memory profile
}

func captureCpuProfile() {
timestamp := time.Now().Format("20060102_150405")
fileName := fmt.Sprintf("cpu_%s.prof", timestamp)
f, err := os.Create(fileName)
if err != nil {
panic("could not create CPU profile: " + err.Error())
}
defer f.Close()

if err := pprof.StartCPUProfile(f); err != nil {
panic("could not start CPU profile: " + err.Error())
}
fmt.Println("CPU profiling started")

// Allow profiling for a certain duration or simulate workload
time.Sleep(12 * time.Second) // Sleep for 10 seconds to simulate CPU activity

pprof.StopCPUProfile()
fmt.Println("CPU profiling stopped")
}
43 changes: 16 additions & 27 deletions ebpf/uprobeBuilder/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"os/exec"
"regexp"
"strings"

Expand All @@ -20,39 +21,27 @@ var (
func CheckProcessCGroupBelongToKube(pid int32) ([]string, error) {
cgroupAbsPath := fmt.Sprintf("/proc/%d/cgroup", pid)
processCgroupFilePath := host.GetFileInHost(cgroupAbsPath)
cgroupFile, err := os.Open(processCgroupFilePath)
if err != nil {
return nil, err
}
defer cgroupFile.Close()

cache := make(map[string]bool)
scanner := bufio.NewScanner(cgroupFile)
for scanner.Scan() {
infos := strings.Split(scanner.Text(), ":")
if len(infos) < 3 {
continue
}
lastPath := strings.LastIndex(infos[2], "/")
if lastPath > 1 && lastPath != len(infos[2])-1 {
path := infos[2][lastPath+1:]
// ex: cri-containerd-7dae778c37bd1204677518f1032bbecf01f5c41878ea7bd370021263417cc626.scope
if kubepod := kubepodsRegex.FindStringSubmatch(path); len(kubepod) >= 1 {
path = kubepod[1]
}
cache[path] = true
}
}
if len(cache) == 0 {
output := checkKubeProcess(processCgroupFilePath)
if len(output) <= 1 {
return nil, fmt.Errorf("no k8s cgroups")
} else {
log.Printf("successfully found a kube process %s", processCgroupFilePath)
}
result := make([]string, 0)
for k := range cache {
result = append(result, k)
}
result = append(result, output)
return result, nil
}

func checkKubeProcess(exePath string) string {
cmd := exec.Command("sh", "-c", "strings "+exePath+" | grep cri-containerd | head -n 1")
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Printf("Error executing command in checkKubeProcess: %v\n", err)
return ""
}
return string(output)
}

func isIgnoreModuleName(name string) bool {
return name != "" &&
(strings.HasPrefix(name, "//anon") ||
Expand Down
23 changes: 18 additions & 5 deletions ebpf/uprobeBuilder/process/processFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"strings"
"sync"
"time"

"github.com/akto-api-security/mirroring-api-logging/ebpf/uprobeBuilder/ssl"
"github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils"
Expand All @@ -29,15 +30,17 @@ type Process struct {
}

type ProcessFactory struct {
processMap map[int32]Process
mutex *sync.RWMutex
processMap map[int32]Process
mutex *sync.RWMutex
unattachedProcess map[int32]bool
}

// NewFactory creates a new instance of the factory.
func NewFactory() *ProcessFactory {
return &ProcessFactory{
processMap: make(map[int32]Process),
mutex: &sync.RWMutex{},
processMap: make(map[int32]Process),
mutex: &sync.RWMutex{},
unattachedProcess: make(map[int32]bool),
}
}

Expand Down Expand Up @@ -80,7 +83,13 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu
}
fmt.Printf("Attempt for %v processes\n", len(pidSet))
for pid := range pidSet {
_, ok := processFactory.processMap[pid]
time.Sleep(200 * time.Millisecond)
_, ok := processFactory.unattachedProcess[pid]
if ok {
fmt.Printf("Not attempting for %v processes\n", pid)
continue
}
_, ok = processFactory.processMap[pid]
if !ok {

if checkSelf(pid) {
Expand All @@ -94,13 +103,15 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu
if err != nil {
if !probeAllPid {
fmt.Printf("No libraries for pid: %v %v\n", pid, err)
processFactory.unattachedProcess[pid] = true
continue
}
}

libraries, err := FindLibrariesPathInMapFile(pid)
if err != nil {
fmt.Printf("No libraries for pid: %v %v\n", pid, err)
processFactory.unattachedProcess[pid] = true
continue
}

Expand Down Expand Up @@ -149,6 +160,8 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu
log.Printf("Node probing error: %v %v\n", pid, err)
}

processFactory.unattachedProcess[pid] = true

}
}
}
Expand Down
10 changes: 10 additions & 0 deletions ebpf/uprobeBuilder/ssl/gohelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ func gettingGoVersionFromString(s string) (v *version.Version, success bool, err
return v, true, err
}

func checkGoProcess(exePath string) bool {
cmd := exec.Command("sh", "-c", "strings "+exePath+" | grep go1")
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Printf("Error executing command in checkGoProcess: %v\n", err)
return false
}
return len(output) > 1
}

type goStringInC struct {
Ptr uint64
Size uint64
Expand Down
8 changes: 8 additions & 0 deletions ebpf/uprobeBuilder/ssl/gotls.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ func TryGoTLSProbes(pid int32, m map[string]bool, bpfModule *bcc.Module) (bool,
if err != nil {
return false, err
}

isGo := checkGoProcess(symLinkHostPath)
if !isGo {
return false, fmt.Errorf("Not a go process")
} else {
log.Printf("successfully found a go process %s", symLinkHostPath)
}

elfFile, err := elf.NewFile(symLinkHostPath)
if err != nil {
return false, fmt.Errorf("read executable file error: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions ebpf/uprobeBuilder/ssl/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package ssl

import (
"fmt"
"log"

"github.com/akto-api-security/mirroring-api-logging/ebpf/bpfwrapper"
"github.com/iovisor/gobpf/bcc"
"log"
)

type NodeTLSSymbolAddress struct {
Expand Down
2 changes: 1 addition & 1 deletion ebpf/uprobeBuilder/ssl/openssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package ssl

import (
"fmt"
"github.com/akto-api-security/mirroring-api-logging/ebpf/bpfwrapper"
"log"
"os/exec"
"regexp"
"strconv"
"strings"

"github.com/akto-api-security/mirroring-api-logging/ebpf/bpfwrapper"
"github.com/iovisor/gobpf/bcc"
)

Expand Down
10 changes: 10 additions & 0 deletions trafficUtil/utils/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,15 @@ func LogMemoryStats() int {
log.Println("Alloc in MB: ", mem)
log.Println("Sys in MB: ", m.Sys/1024/1024)

// gc stats

log.Println("Last gc finished: ", m.LastGC)
log.Println("Target heap size of next gc cycle: ", m.NextGC)
log.Println("Stop The world pauses ", m.PauseTotalNs)
log.Println("GcSys ", m.GCSys)
log.Println("GCCPUFraction ", m.GCCPUFraction)
log.Println("NumGC ", m.NumGC)
log.Println("NumForcedGC ", m.NumForcedGC)

return mem
}
Loading