@@ -3,6 +3,7 @@ package pyroscope
3
3
import (
4
4
"bytes"
5
5
"github.com/pyroscope-io/client/internal/alignedticker"
6
+ "github.com/pyroscope-io/client/internal/cumulativepprof"
6
7
"runtime"
7
8
"runtime/pprof"
8
9
"sync"
@@ -19,6 +20,7 @@ type Session struct {
19
20
profileTypes []ProfileType
20
21
uploadRate time.Duration
21
22
disableGCRuns bool
23
+ disableCumulativeMerge bool
22
24
DisableAutomaticResets bool
23
25
24
26
logger Logger
@@ -40,6 +42,8 @@ type Session struct {
40
42
lastGCGeneration uint32
41
43
appName string
42
44
startTime time.Time
45
+
46
+ mergers * cumulativepprof.Mergers
43
47
}
44
48
45
49
type SessionConfig struct {
@@ -50,6 +54,7 @@ type SessionConfig struct {
50
54
ProfilingTypes []ProfileType
51
55
DisableGCRuns bool
52
56
DisableAutomaticResets bool
57
+ DisableCumulativeMerge bool
53
58
SampleRate uint32
54
59
UploadRate time.Duration
55
60
}
@@ -71,6 +76,7 @@ func NewSession(c SessionConfig) (*Session, error) {
71
76
profileTypes : c .ProfilingTypes ,
72
77
disableGCRuns : c .DisableGCRuns ,
73
78
DisableAutomaticResets : c .DisableAutomaticResets ,
79
+ disableCumulativeMerge : c .DisableCumulativeMerge ,
74
80
sampleRate : c .SampleRate ,
75
81
uploadRate : c .UploadRate ,
76
82
stopCh : make (chan struct {}),
@@ -81,8 +87,9 @@ func NewSession(c SessionConfig) (*Session, error) {
81
87
goroutinesBuf : & bytes.Buffer {},
82
88
mutexBuf : & bytes.Buffer {},
83
89
blockBuf : & bytes.Buffer {},
84
- }
85
90
91
+ mergers : cumulativepprof .NewMergers (),
92
+ }
86
93
return ps , nil
87
94
}
88
95
@@ -265,7 +272,7 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
265
272
curBlockBuf := copyBuf (ps .blockBuf .Bytes ())
266
273
ps .blockBuf .Reset ()
267
274
if ps .blockPrevBytes != nil {
268
- ps . upstream . Upload ( & upstream.UploadJob {
275
+ job := & upstream.UploadJob {
269
276
Name : ps .appName ,
270
277
StartTime : startTime ,
271
278
EndTime : endTime ,
@@ -285,7 +292,9 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
285
292
Cumulative : true ,
286
293
},
287
294
},
288
- })
295
+ }
296
+ ps .mergeCumulativeProfile (ps .mergers .Block , job )
297
+ ps .upstream .Upload (job )
289
298
}
290
299
ps .blockPrevBytes = curBlockBuf
291
300
}
@@ -297,7 +306,7 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
297
306
curMutexBuf := copyBuf (ps .mutexBuf .Bytes ())
298
307
ps .mutexBuf .Reset ()
299
308
if ps .mutexPrevBytes != nil {
300
- ps . upstream . Upload ( & upstream.UploadJob {
309
+ job := & upstream.UploadJob {
301
310
Name : ps .appName ,
302
311
StartTime : startTime ,
303
312
EndTime : endTime ,
@@ -317,7 +326,9 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
317
326
Cumulative : true ,
318
327
},
319
328
},
320
- })
329
+ }
330
+ ps .mergeCumulativeProfile (ps .mergers .Mutex , job )
331
+ ps .upstream .Upload (job )
321
332
}
322
333
ps .mutexPrevBytes = curMutexBuf
323
334
}
@@ -336,8 +347,8 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
336
347
pprof .WriteHeapProfile (ps .memBuf )
337
348
curMemBytes := copyBuf (ps .memBuf .Bytes ())
338
349
ps .memBuf .Reset ()
339
- if ps .memPrevBytes != nil {
340
- ps . upstream . Upload ( & upstream.UploadJob {
350
+ if ps .memPrevBytes != nil { //todo does this if statement loose first 10s profile?
351
+ job := & upstream.UploadJob {
341
352
Name : ps .appName ,
342
353
StartTime : startTime ,
343
354
EndTime : endTime ,
@@ -346,14 +357,37 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
346
357
Format : upstream .FormatPprof ,
347
358
Profile : curMemBytes ,
348
359
PrevProfile : ps .memPrevBytes ,
349
- })
360
+ }
361
+ ps .mergeCumulativeProfile (ps .mergers .Heap , job )
362
+ ps .upstream .Upload (job )
350
363
}
351
364
ps .memPrevBytes = curMemBytes
352
365
ps .lastGCGeneration = currentGCGeneration
353
366
}
354
367
}
355
368
}
356
369
370
+ func (ps * Session ) mergeCumulativeProfile (m * cumulativepprof.Merger , job * upstream.UploadJob ) {
371
+ // todo should we filter by enabled ps.profileTypes to reduce profile size ? maybe add a separate option ?
372
+ if ps .disableCumulativeMerge {
373
+ return
374
+ }
375
+ p , err := m .Merge (job .PrevProfile , job .Profile )
376
+ if err != nil {
377
+ ps .logger .Errorf ("failed to merge %s profiles %v" , m .Name , err )
378
+ return
379
+ }
380
+ var prof bytes.Buffer
381
+ err = p .Write (& prof )
382
+ if err != nil {
383
+ ps .logger .Errorf ("failed to serialize merged %s profiles %v" , m .Name , err )
384
+ return
385
+ }
386
+ job .PrevProfile = nil
387
+ job .Profile = prof .Bytes ()
388
+ job .SampleTypeConfig = m .SampleTypeConfig
389
+ }
390
+
357
391
func (ps * Session ) Stop () {
358
392
ps .trieMutex .Lock ()
359
393
defer ps .trieMutex .Unlock ()
0 commit comments