Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: client side pprof merging for cumulative types #24

Merged
merged 15 commits into from
Dec 16, 2022
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: ['1.15', '1.16', '1.17', '1.18']
go: ['1.16', '1.17', '1.18']
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/pyroscope-io/client

go 1.17

require github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY=
github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
129 changes: 129 additions & 0 deletions internal/cumulativepprof/cumulative.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package cumulativepprof

import (
"fmt"
pprofile "github.com/google/pprof/profile"
"github.com/pyroscope-io/client/upstream"
)

type Merger struct {
SampleTypes []string
MergeRatios []float64
SampleTypeConfig map[string]*upstream.SampleType
Name string

prev *pprofile.Profile
}

type Mergers struct {
Heap *Merger
Block *Merger
Mutex *Merger
}

func NewMergers() *Mergers {
return &Mergers{
Block: &Merger{
SampleTypes: []string{"contentions", "delay"},
MergeRatios: []float64{-1, -1},
SampleTypeConfig: map[string]*upstream.SampleType{
"contentions": {
DisplayName: "block_count",
Units: "lock_samples",
},
"delay": {
DisplayName: "block_duration",
Units: "lock_nanoseconds",
},
},
Name: "block",
},
Mutex: &Merger{
SampleTypes: []string{"contentions", "delay"},
MergeRatios: []float64{-1, -1},
SampleTypeConfig: map[string]*upstream.SampleType{
"contentions": {
DisplayName: "mutex_count",
Units: "lock_samples",
},
"delay": {
DisplayName: "mutex_duration",
Units: "lock_nanoseconds",
},
},
Name: "mutex",
},
Heap: &Merger{
SampleTypes: []string{"alloc_objects", "alloc_space", "inuse_objects", "inuse_space"},
MergeRatios: []float64{-1, -1, 0, 0},
SampleTypeConfig: map[string]*upstream.SampleType{
"alloc_objects": {
Units: "objects",
},
"alloc_space": {
Units: "bytes",
},
"inuse_space": {
Units: "bytes",
Aggregation: "average",
},
"inuse_objects": {
Units: "objects",
Aggregation: "average",
},
},
Name: "heap",
},
}
}

func (m *Merger) Merge(prev, cur []byte) (*pprofile.Profile, error) {
p2, err := m.parseProfile(cur)
if err != nil {
return nil, err
}
p1 := m.prev
if p1 == nil {
p1, err = m.parseProfile(prev)
if err != nil {
return nil, err
}
}

err = p1.ScaleN(m.MergeRatios)
if err != nil {
return nil, err
}

p, err := pprofile.Merge([]*pprofile.Profile{p1, p2})
if err != nil {
return nil, err
}

for _, sample := range p.Sample {
if len(sample.Value) > 0 && sample.Value[0] < 0 {
for i := range sample.Value {
sample.Value[i] = 0
}
}
}

m.prev = p2
return p, nil
}

func (m *Merger) parseProfile(bs []byte) (*pprofile.Profile, error) {
p, err := pprofile.ParseData(bs)
if err != nil {
return nil, err
}
if got := len(p.SampleType); got != len(m.SampleTypes) {
return nil, fmt.Errorf("invalid profile: got %d sample types, want %d", got, len(m.SampleTypes))
}
for i, want := range m.SampleTypes {
if got := p.SampleType[i].Type; got != want {
return nil, fmt.Errorf("invalid profile: got %q sample type at index %d, want %q", got, i, want)
}
}
return p, nil
}
2 changes: 2 additions & 0 deletions pyroscope/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Config struct {
ProfileTypes []ProfileType
DisableGCRuns bool // this will disable automatic runtime.GC runs between getting the heap profiles
DisableAutomaticResets bool // disable automatic profiler reset every 10 seconds. Reset manually by calling Flush method
DisableCumulativeMerge bool // disable client side merging of cumulative profiles (alloc_objects, alloc_count, block_count, block_duration, mutex_count, mutex_duration) merging
}

type Profiler struct {
Expand Down Expand Up @@ -65,6 +66,7 @@ func Start(cfg Config) (*Profiler, error) {
ProfilingTypes: cfg.ProfileTypes,
DisableGCRuns: cfg.DisableGCRuns,
DisableAutomaticResets: cfg.DisableAutomaticResets,
DisableCumulativeMerge: cfg.DisableCumulativeMerge,
SampleRate: cfg.SampleRate,
UploadRate: 10 * time.Second,
}
Expand Down
50 changes: 42 additions & 8 deletions pyroscope/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pyroscope
import (
"bytes"
"github.com/pyroscope-io/client/internal/alignedticker"
"github.com/pyroscope-io/client/internal/cumulativepprof"
"runtime"
"runtime/pprof"
"sync"
Expand All @@ -19,6 +20,7 @@ type Session struct {
profileTypes []ProfileType
uploadRate time.Duration
disableGCRuns bool
disableCumulativeMerge bool
DisableAutomaticResets bool

logger Logger
Expand All @@ -40,6 +42,8 @@ type Session struct {
lastGCGeneration uint32
appName string
startTime time.Time

mergers *cumulativepprof.Mergers
}

type SessionConfig struct {
Expand All @@ -50,6 +54,7 @@ type SessionConfig struct {
ProfilingTypes []ProfileType
DisableGCRuns bool
DisableAutomaticResets bool
DisableCumulativeMerge bool
SampleRate uint32
UploadRate time.Duration
}
Expand All @@ -71,6 +76,7 @@ func NewSession(c SessionConfig) (*Session, error) {
profileTypes: c.ProfilingTypes,
disableGCRuns: c.DisableGCRuns,
DisableAutomaticResets: c.DisableAutomaticResets,
disableCumulativeMerge: c.DisableCumulativeMerge,
sampleRate: c.SampleRate,
uploadRate: c.UploadRate,
stopCh: make(chan struct{}),
Expand All @@ -81,8 +87,9 @@ func NewSession(c SessionConfig) (*Session, error) {
goroutinesBuf: &bytes.Buffer{},
mutexBuf: &bytes.Buffer{},
blockBuf: &bytes.Buffer{},
}

mergers: cumulativepprof.NewMergers(),
}
return ps, nil
}

Expand Down Expand Up @@ -265,7 +272,7 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
curBlockBuf := copyBuf(ps.blockBuf.Bytes())
ps.blockBuf.Reset()
if ps.blockPrevBytes != nil {
ps.upstream.Upload(&upstream.UploadJob{
job := &upstream.UploadJob{
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
Expand All @@ -285,7 +292,9 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
Cumulative: true,
},
},
})
}
ps.mergeCumulativeProfile(ps.mergers.Block, job)
ps.upstream.Upload(job)
}
ps.blockPrevBytes = curBlockBuf
}
Expand All @@ -297,7 +306,7 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
curMutexBuf := copyBuf(ps.mutexBuf.Bytes())
ps.mutexBuf.Reset()
if ps.mutexPrevBytes != nil {
ps.upstream.Upload(&upstream.UploadJob{
job := &upstream.UploadJob{
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
Expand All @@ -317,7 +326,9 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
Cumulative: true,
},
},
})
}
ps.mergeCumulativeProfile(ps.mergers.Mutex, job)
ps.upstream.Upload(job)
}
ps.mutexPrevBytes = curMutexBuf
}
Expand All @@ -336,8 +347,8 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
pprof.WriteHeapProfile(ps.memBuf)
curMemBytes := copyBuf(ps.memBuf.Bytes())
ps.memBuf.Reset()
if ps.memPrevBytes != nil {
ps.upstream.Upload(&upstream.UploadJob{
if ps.memPrevBytes != nil { //todo does this if statement loose first 10s profile?
korniltsev marked this conversation as resolved.
Show resolved Hide resolved
job := &upstream.UploadJob{
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
Expand All @@ -346,14 +357,37 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
Format: upstream.FormatPprof,
Profile: curMemBytes,
PrevProfile: ps.memPrevBytes,
})
}
ps.mergeCumulativeProfile(ps.mergers.Heap, job)
ps.upstream.Upload(job)
}
ps.memPrevBytes = curMemBytes
ps.lastGCGeneration = currentGCGeneration
}
}
}

func (ps *Session) mergeCumulativeProfile(m *cumulativepprof.Merger, job *upstream.UploadJob) {
// todo should we filter by enabled ps.profileTypes to reduce profile size ? maybe add a separate option ?
if ps.disableCumulativeMerge {
return
}
p, err := m.Merge(job.PrevProfile, job.Profile)
if err != nil {
ps.logger.Errorf("failed to merge %s profiles %v", m.Name, err)
return
}
var prof bytes.Buffer
err = p.Write(&prof)
if err != nil {
ps.logger.Errorf("failed to serialize merged %s profiles %v", m.Name, err)
return
}
job.PrevProfile = nil
job.Profile = prof.Bytes()
job.SampleTypeConfig = m.SampleTypeConfig
}

func (ps *Session) Stop() {
ps.trieMutex.Lock()
defer ps.trieMutex.Unlock()
Expand Down