diff --git a/example/main.go b/example/main.go index 8b85b38..242f872 100644 --- a/example/main.go +++ b/example/main.go @@ -3,7 +3,9 @@ package main import ( "context" "fmt" + "runtime" "runtime/pprof" + "sync" "github.com/pyroscope-io/client/pyroscope" ) @@ -17,30 +19,57 @@ func work(n int) { // revive:enable:empty-block } -func fastFunction(c context.Context) { +var m sync.Mutex + +func fastFunction(c context.Context, wg *sync.WaitGroup) { + m.Lock() + defer m.Unlock() + pyroscope.TagWrapper(c, pyroscope.Labels("function", "fast"), func(c context.Context) { work(200000000) }) + wg.Done() } -func slowFunction(c context.Context) { +func slowFunction(c context.Context, wg *sync.WaitGroup) { + m.Lock() + defer m.Unlock() + // standard pprof.Do wrappers work as well pprof.Do(c, pprof.Labels("function", "slow"), func(c context.Context) { work(800000000) }) + wg.Done() } func main() { + runtime.SetMutexProfileFraction(5) + runtime.SetBlockProfileRate(5) pyroscope.Start(pyroscope.Config{ ApplicationName: "simple.golang.app-new", ServerAddress: "http://localhost:4040", // this will run inside docker-compose, hence `pyroscope` for hostname Logger: pyroscope.StandardLogger, + ProfileTypes: []pyroscope.ProfileType{ + pyroscope.ProfileCPU, + pyroscope.ProfileInuseObjects, + pyroscope.ProfileAllocObjects, + pyroscope.ProfileInuseSpace, + pyroscope.ProfileAllocSpace, + pyroscope.ProfileGoroutines, + pyroscope.ProfileMutexCount, + pyroscope.ProfileMutexDuration, + pyroscope.ProfileBlockCount, + pyroscope.ProfileBlockDuration, + }, }) pyroscope.TagWrapper(context.Background(), pyroscope.Labels("foo", "bar"), func(c context.Context) { for { - fastFunction(c) - slowFunction(c) + wg := sync.WaitGroup{} + wg.Add(2) + go fastFunction(c, &wg) + go slowFunction(c, &wg) + wg.Wait() } }) } diff --git a/pyroscope/session.go b/pyroscope/session.go index b6d89c8..9be9b7b 100644 --- a/pyroscope/session.go +++ b/pyroscope/session.go @@ -26,9 +26,14 @@ type Session struct { trieMutex sync.Mutex // these things do change: - cpuBuf *bytes.Buffer - memBuf *bytes.Buffer - memPrevBytes []byte + cpuBuf *bytes.Buffer + memBuf *bytes.Buffer + memPrevBytes []byte + goroutinesBuf *bytes.Buffer + mutexBuf *bytes.Buffer + mutexPrevBytes []byte + blockBuf *bytes.Buffer + blockPrevBytes []byte lastGCGeneration uint32 appName string @@ -63,6 +68,9 @@ func NewSession(c SessionConfig) (*Session, error) { logger: c.Logger, cpuBuf: &bytes.Buffer{}, memBuf: &bytes.Buffer{}, + goroutinesBuf: &bytes.Buffer{}, + mutexBuf: &bytes.Buffer{}, + blockBuf: &bytes.Buffer{}, } return ps, nil @@ -150,6 +158,33 @@ func (ps *Session) isMemEnabled() bool { return false } +func (ps *Session) isBlockEnabled() bool { + for _, t := range ps.profileTypes { + if t == ProfileBlockCount || t == ProfileBlockDuration { + return true + } + } + return false +} + +func (ps *Session) isMutexEnabled() bool { + for _, t := range ps.profileTypes { + if t == ProfileMutexCount || t == ProfileMutexDuration { + return true + } + } + return false +} + +func (ps *Session) isGoroutinesEnabled() bool { + for _, t := range ps.profileTypes { + if t == ProfileGoroutines { + return true + } + } + return false +} + func (ps *Session) reset() { now := time.Now() endTime := now.Truncate(ps.uploadRate) @@ -186,6 +221,96 @@ func (ps *Session) uploadData(startTime, endTime time.Time) { ps.cpuBuf.Reset() } + if ps.isGoroutinesEnabled() { + p := pprof.Lookup("goroutine") + if p != nil { + p.WriteTo(ps.goroutinesBuf, 0) + ps.upstream.Upload(&upstream.UploadJob{ + Name: ps.appName, + StartTime: startTime, + EndTime: endTime, + SpyName: "gospy", + Units: "goroutines", + AggregationType: "average", + Format: upstream.FormatPprof, + Profile: copyBuf(ps.goroutinesBuf.Bytes()), + SampleTypeConfig: map[string]*upstream.SampleType{ + "goroutine": { + DisplayName: "goroutines", + Units: "goroutines", + Aggregation: "average", + }, + }, + }) + ps.goroutinesBuf.Reset() + } + } + + if ps.isBlockEnabled() { + p := pprof.Lookup("block") + if p != nil { + p.WriteTo(ps.blockBuf, 0) + curBlockBuf := copyBuf(ps.blockBuf.Bytes()) + ps.blockBuf.Reset() + if ps.blockPrevBytes != nil { + ps.upstream.Upload(&upstream.UploadJob{ + Name: ps.appName, + StartTime: startTime, + EndTime: endTime, + SpyName: "gospy", + Format: upstream.FormatPprof, + Profile: curBlockBuf, + PrevProfile: ps.blockPrevBytes, + SampleTypeConfig: map[string]*upstream.SampleType{ + "contentions": { + DisplayName: "block_count", + Units: "lock_samples", + Cumulative: true, + }, + "delay": { + DisplayName: "block_duration", + Units: "lock_nanoseconds", + Cumulative: true, + }, + }, + }) + } + ps.blockPrevBytes = curBlockBuf + } + } + if ps.isMutexEnabled() { + p := pprof.Lookup("mutex") + if p != nil { + p.WriteTo(ps.mutexBuf, 0) + curMutexBuf := copyBuf(ps.mutexBuf.Bytes()) + ps.mutexBuf.Reset() + if ps.mutexPrevBytes != nil { + ps.upstream.Upload(&upstream.UploadJob{ + Name: ps.appName, + StartTime: startTime, + EndTime: endTime, + SpyName: "gospy", + Format: upstream.FormatPprof, + Profile: curMutexBuf, + PrevProfile: ps.mutexPrevBytes, + SampleTypeConfig: map[string]*upstream.SampleType{ + "contentions": { + DisplayName: "mutex_count", + Units: "lock_samples", + Cumulative: true, + }, + "delay": { + DisplayName: "mutex_duration", + Units: "lock_nanoseconds", + Cumulative: true, + }, + }, + }) + } + ps.mutexPrevBytes = curMutexBuf + } + } + if ps.isMemEnabled() { currentGCGeneration := numGC() // sometimes GC doesn't run within 10 seconds diff --git a/pyroscope/types.go b/pyroscope/types.go index bb203a9..599c21c 100644 --- a/pyroscope/types.go +++ b/pyroscope/types.go @@ -12,12 +12,17 @@ type Logger interface { } const ( - ProfileCPU ProfileType = "cpu" - ProfileInuseObjects ProfileType = "inuse_objects" - ProfileAllocObjects ProfileType = "alloc_objects" - ProfileInuseSpace ProfileType = "inuse_space" - ProfileAllocSpace ProfileType = "alloc_space" - DefaultSampleRate = 100 + ProfileCPU ProfileType = "cpu" + ProfileInuseObjects ProfileType = "inuse_objects" + ProfileAllocObjects ProfileType = "alloc_objects" + ProfileInuseSpace ProfileType = "inuse_space" + ProfileAllocSpace ProfileType = "alloc_space" + ProfileGoroutines ProfileType = "goroutines" + ProfileMutexCount ProfileType = "mutex_count" + ProfileMutexDuration ProfileType = "mutex_duration" + ProfileBlockCount ProfileType = "block_count" + ProfileBlockDuration ProfileType = "block_duration" + DefaultSampleRate = 100 ) var DefaultProfileTypes = []ProfileType{ diff --git a/upstream/remote/remote.go b/upstream/remote/remote.go index c316179..2294328 100644 --- a/upstream/remote/remote.go +++ b/upstream/remote/remote.go @@ -2,6 +2,7 @@ package remote import ( "bytes" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -129,6 +130,17 @@ func (r *Remote) uploadProfile(j *upstream.UploadJob) error { return err } } + if j.SampleTypeConfig != nil { + fw, err = writer.CreateFormFile("sample_type_config", "sample_type_config.json") + if err != nil { + return err + } + b, err := json.Marshal(j.SampleTypeConfig) + if err != nil { + return err + } + fw.Write(b) + } writer.Close() q := u.Query() diff --git a/upstream/upstream.go b/upstream/upstream.go index 9cc9fb7..ff8ed65 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -12,15 +12,24 @@ type Upstream interface { Upload(*UploadJob) } +type SampleType struct { + Units string `json:"units,omitempty"` + Aggregation string `json:"aggregation,omitempty"` + DisplayName string `json:"display-name,omitempty"` + Sampled bool `json:"sampled,omitempty"` + Cumulative bool `json:"cumulative,omitempty"` +} + type UploadJob struct { - Name string - StartTime time.Time - EndTime time.Time - SpyName string - SampleRate uint32 - Units string - AggregationType string - Format Format - Profile []byte - PrevProfile []byte + Name string + StartTime time.Time + EndTime time.Time + SpyName string + SampleRate uint32 + Units string + AggregationType string + Format Format + Profile []byte + PrevProfile []byte + SampleTypeConfig map[string]*SampleType }