Skip to content

Commit

Permalink
feat: add goroutine watcher (#16)
Browse files Browse the repository at this point in the history
* [FEED-8337] feat: add goroutine watcher

* lint

* fix: ci lint

* chore: wrong comment

* feat: handle error

* chore: new line

* feat: add benchmark test for async

* apply reviews

* default value

* macos 11 deprecated on github action
  • Loading branch information
jake-shin0 authored Jul 24, 2024
1 parent be7f157 commit b91e419
Show file tree
Hide file tree
Showing 26 changed files with 807 additions and 292 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
test-on-macos:
strategy:
matrix:
os-version: [ 11, 12 ]
os-version: [ 12, 13 ]
go-version: [ 1.16, 1.17, 1.18, 1.19 ]
runs-on: macos-${{ matrix.os-version }}
steps:
Expand Down
106 changes: 95 additions & 11 deletions autopprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/daangn/autopprof/queryer"
"log"
"time"

Expand Down Expand Up @@ -34,13 +35,22 @@ type autoPprof struct {
// Default: 0.75. (mean 75%)
memThreshold float64

// goroutineThreshold is the goroutine count threshold to trigger profile.
// If the goroutine count is over the threshold, the autopprof will
// report the goroutine profile.
// Default: 50000.
goroutineThreshold int

// minConsecutiveOverThreshold is the minimum consecutive
// number of over a threshold for reporting profile again.
// Default: 12.
minConsecutiveOverThreshold int

// queryer is used to query the quota and the cgroup stat.
queryer queryer
// cgroupQueryer is used to query the quota and the cgroup stat.
cgroupQueryer queryer.CgroupsQueryer

// runtimeQueryer is used to query the runtime stat.
runtimeQueryer queryer.RuntimeQueryer

// profiler is used to profile the cpu and the heap memory.
profiler profiler
Expand All @@ -53,8 +63,9 @@ type autoPprof struct {
reportBoth bool

// Flags to disable the profiling.
disableCPUProf bool
disableMemProf bool
disableCPUProf bool
disableMemProf bool
disableGoroutineProf bool

// stopC is the signal channel to stop the watch processes.
stopC chan struct{}
Expand All @@ -65,7 +76,12 @@ var globalAp *autoPprof

// Start configures and runs the autopprof process.
func Start(opt Option) error {
qryer, err := newQueryer()
cgroupQryer, err := queryer.NewCgroupQueryer()
if err != nil {
return err
}

runtimeQryer, err := queryer.NewRuntimeQueryer()
if err != nil {
return err
}
Expand All @@ -78,8 +94,10 @@ func Start(opt Option) error {
watchInterval: defaultWatchInterval,
cpuThreshold: defaultCPUThreshold,
memThreshold: defaultMemThreshold,
goroutineThreshold: defaultGoroutineThreshold,
minConsecutiveOverThreshold: defaultMinConsecutiveOverThreshold,
queryer: qryer,
cgroupQueryer: cgroupQryer,
runtimeQueryer: runtimeQryer,
profiler: profr,
reporter: opt.Reporter,
reportBoth: opt.ReportBoth,
Expand All @@ -93,6 +111,9 @@ func Start(opt Option) error {
if opt.MemThreshold != 0 {
ap.memThreshold = opt.MemThreshold
}
if opt.GoroutineThreshold != 0 {
ap.goroutineThreshold = opt.GoroutineThreshold
}
if !ap.disableCPUProf {
if err := ap.loadCPUQuota(); err != nil {
return err
Expand All @@ -112,7 +133,7 @@ func Stop() {
}

func (ap *autoPprof) loadCPUQuota() error {
err := ap.queryer.setCPUQuota()
err := ap.cgroupQueryer.SetCPUQuota()
if err == nil {
return nil
}
Expand All @@ -134,6 +155,7 @@ func (ap *autoPprof) loadCPUQuota() error {
func (ap *autoPprof) watch() {
go ap.watchCPUUsage()
go ap.watchMemUsage()
go ap.watchGoroutineCount()
<-ap.stopC
}

Expand All @@ -149,7 +171,7 @@ func (ap *autoPprof) watchCPUUsage() {
for {
select {
case <-ticker.C:
usage, err := ap.queryer.cpuUsage()
usage, err := ap.cgroupQueryer.CPUUsage()
if err != nil {
log.Println(err)
return
Expand All @@ -170,7 +192,7 @@ func (ap *autoPprof) watchCPUUsage() {
))
}
if ap.reportBoth && !ap.disableMemProf {
memUsage, err := ap.queryer.memUsage()
memUsage, err := ap.cgroupQueryer.MemUsage()
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -226,7 +248,7 @@ func (ap *autoPprof) watchMemUsage() {
for {
select {
case <-ticker.C:
usage, err := ap.queryer.memUsage()
usage, err := ap.cgroupQueryer.MemUsage()
if err != nil {
log.Println(err)
return
Expand All @@ -247,7 +269,7 @@ func (ap *autoPprof) watchMemUsage() {
))
}
if ap.reportBoth && !ap.disableCPUProf {
cpuUsage, err := ap.queryer.cpuUsage()
cpuUsage, err := ap.cgroupQueryer.CPUUsage()
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -291,6 +313,68 @@ func (ap *autoPprof) reportHeapProfile(memUsage float64) error {
return nil
}

func (ap *autoPprof) watchGoroutineCount() {
if ap.disableGoroutineProf {
return
}

ticker := time.NewTicker(ap.watchInterval)
defer ticker.Stop()

var consecutiveOverThresholdCnt int
for {
select {
case <-ticker.C:
count := ap.runtimeQueryer.GoroutineCount()

if count < ap.goroutineThreshold {
// Reset the count if the goroutine count goes under the threshold.
consecutiveOverThresholdCnt = 0
continue
}

// If goroutine count remains high for a short period of time, no
// duplicate reports are sent.
// This is to prevent the autopprof from sending too many reports.
if consecutiveOverThresholdCnt == 0 {
if err := ap.reportGoroutineProfile(count); err != nil {
log.Println(fmt.Errorf(
"autopprof: failed to report the goroutine profile: %w", err,
))
}
}

consecutiveOverThresholdCnt++
if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold {
// Reset the count and ready to report the goroutine profile again.
consecutiveOverThresholdCnt = 0
}
case <-ap.stopC:
return
}
}
}

func (ap *autoPprof) reportGoroutineProfile(goroutineCount int) error {
b, err := ap.profiler.profileGoroutine()
if err != nil {
return fmt.Errorf("autopprof: failed to profile the goroutine: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), reportTimeout)
defer cancel()

gi := report.GoroutineInfo{
ThresholdCount: ap.goroutineThreshold,
Count: goroutineCount,
}
bReader := bytes.NewReader(b)
if err := ap.reporter.ReportGoroutineProfile(ctx, bReader, gi); err != nil {
return err
}
return nil
}

func (ap *autoPprof) stop() {
close(ap.stopC)
}
Loading

0 comments on commit b91e419

Please sign in to comment.