Skip to content

Commit d51fd44

Browse files
Enable persisting ingestion/query benchmark results in a common format across targets (timescale#147)
resultset: removing intermedia non-work chars from JSON label Co-authored-by: Ante Kresic <[email protected]>
1 parent 45b6321 commit d51fd44

9 files changed

+212
-41
lines changed

load/loader.go

+53-15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package load
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"github.com/timescale/tsbs/pkg/targets"
7+
"io/ioutil"
68
"log"
79
"math/rand"
810
"sync"
@@ -29,21 +31,22 @@ var (
2931

3032
// BenchmarkRunnerConfig contains all the configuration information required for running BenchmarkRunner.
3133
type BenchmarkRunnerConfig struct {
32-
DBName string `yaml:"db-name" mapstructure:"db-name"`
33-
BatchSize uint `yaml:"batch-size" mapstructure:"batch-size"`
34-
Workers uint `yaml:"workers" mapstructure:"workers"`
35-
Limit uint64 `yaml:"limit" mapstructure:"limit"`
36-
DoLoad bool `yaml:"do-load" mapstructure:"do-load"`
37-
DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db"`
38-
DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist"`
39-
ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period"`
40-
HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers"`
41-
NoFlowControl bool `yaml:"no-flow-control" mapstructure:"no-flow-control"`
42-
ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity"`
43-
InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals"`
34+
DBName string `yaml:"db-name" mapstructure:"db-name" json:"db-name"`
35+
BatchSize uint `yaml:"batch-size" mapstructure:"batch-size" json:"batch-size"`
36+
Workers uint `yaml:"workers" mapstructure:"workers" json:"workers"`
37+
Limit uint64 `yaml:"limit" mapstructure:"limit" json:"limit"`
38+
DoLoad bool `yaml:"do-load" mapstructure:"do-load" json:"do-load"`
39+
DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db" json:"do-create-db"`
40+
DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist" json:"do-abort-on-exist"`
41+
ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period" json:"reporting-period"`
42+
HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers" json:"hash-workers"`
43+
NoFlowControl bool `yaml:"no-flow-control" mapstructure:"no-flow-control" json:"no-flow-control"`
44+
ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity" json:"channel-capacity"`
45+
InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals" json:"insert-intervals"`
46+
ResultsFile string `yaml:"results-file" mapstructure:"results-file" json:"results-file"`
4447
// deprecated, should not be used in other places other than tsbs_load_xx commands
45-
FileName string `yaml:"file" mapstructure:"file"`
46-
Seed int64 `yaml:"seed" mapstructure:"seed"`
48+
FileName string `yaml:"file" mapstructure:"file" json:"file"`
49+
Seed int64 `yaml:"seed" mapstructure:"seed" json:"seed"`
4750
}
4851

4952
// AddToFlagSet adds command line flags needed by the BenchmarkRunnerConfig to the flag set.
@@ -60,6 +63,7 @@ func (c BenchmarkRunnerConfig) AddToFlagSet(fs *pflag.FlagSet) {
6063
fs.Int64("seed", 0, "PRNG seed (default: 0, which uses the current timestamp)")
6164
fs.String("insert-intervals", "", "Time to wait between each insert, default '' => all workers insert ASAP. '1,2' = worker 1 waits 1s between inserts, worker 2 and others wait 2s")
6265
fs.Bool("hash-workers", false, "Whether to consistently hash insert data to the same workers (i.e., the data for a particular host always goes to the same worker)")
66+
fs.String("results-file", "", "Write the test results summary json to this file")
6367
}
6468

6569
type BenchmarkRunner interface {
@@ -138,7 +142,41 @@ func (l *CommonBenchmarkRunner) postRun(wg *sync.WaitGroup, start *time.Time) {
138142
// Wait for all workers to finish
139143
wg.Wait()
140144
end := time.Now()
141-
l.summary(end.Sub(*start))
145+
took := end.Sub(*start)
146+
l.summary(took)
147+
if l.BenchmarkRunnerConfig.ResultsFile != "" {
148+
metricRate := float64(l.metricCnt) / took.Seconds()
149+
rowRate := float64(l.rowCnt) / took.Seconds()
150+
l.saveTestResult(took, *start, end, metricRate, rowRate)
151+
}
152+
}
153+
154+
func (l *CommonBenchmarkRunner) saveTestResult(took time.Duration, start time.Time, end time.Time, metricRate, rowRate float64) {
155+
totals := make(map[string]interface{})
156+
totals["metricRate"] = metricRate
157+
if l.rowCnt > 0 {
158+
totals["rowRate"] = rowRate
159+
}
160+
161+
testResult := LoaderTestResult{
162+
ResultFormatVersion: LoaderTestResultVersion,
163+
RunnerConfig: l.BenchmarkRunnerConfig,
164+
StartTime: start.Unix(),
165+
EndTime: end.Unix(),
166+
DurationMillis: took.Milliseconds(),
167+
Totals: totals,
168+
}
169+
170+
_, _ = fmt.Printf("Saving results json file to %s\n", l.BenchmarkRunnerConfig.ResultsFile)
171+
file, err := json.MarshalIndent(testResult, "", " ")
172+
if err != nil {
173+
log.Fatal(err)
174+
}
175+
176+
err = ioutil.WriteFile(l.BenchmarkRunnerConfig.ResultsFile, file, 0644)
177+
if err != nil {
178+
log.Fatal(err)
179+
}
142180
}
143181

144182
// RunBenchmark takes in a Benchmark b and uses it to run the load benchmark

load/loader_test_result.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package load
2+
3+
const LoaderTestResultVersion = "0.1"
4+
5+
// LoaderTestResult aggregates the results of an insert or load benchmark in a common format across targets
6+
type LoaderTestResult struct {
7+
// Format Configs
8+
ResultFormatVersion string `json:"ResultFormatVersion"`
9+
10+
// RunnerConfig Configs
11+
RunnerConfig BenchmarkRunnerConfig `json:"RunnerConfig"`
12+
13+
// Run info
14+
StartTime int64 `json:"StartTime`
15+
EndTime int64 `json:"EndTime"`
16+
DurationMillis int64 `json:"DurationMillis"`
17+
18+
// Totals
19+
Totals map[string]interface{} `json:"Totals"`
20+
}

pkg/query/benchmark_result.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package query
2+
3+
const BenchmarkTestResultVersion = "0.1"
4+
5+
// LoaderTestResult aggregates the results of an query benchmark in a common format across targets
6+
type LoaderTestResult struct {
7+
// Format Configs
8+
ResultFormatVersion string `json:"ResultFormatVersion"`
9+
10+
// RunnerConfig Configs
11+
RunnerConfig BenchmarkRunnerConfig `json:"RunnerConfig"`
12+
13+
// Run info
14+
StartTime int64 `json:"StartTime`
15+
EndTime int64 `json:"EndTime"`
16+
DurationMillis int64 `json:"DurationMillis"`
17+
18+
// Totals
19+
Totals map[string]interface{} `json:"Totals"`
20+
}

pkg/query/benchmarker.go

+31
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package query
22

33
import (
44
"bufio"
5+
"encoding/json"
56
"fmt"
7+
"io/ioutil"
68
"log"
79
"os"
810
"runtime/pprof"
@@ -35,6 +37,7 @@ type BenchmarkRunnerConfig struct {
3537
BurnIn uint64 `mapstructure:"burn-in"`
3638
PrintInterval uint64 `mapstructure:"print-interval"`
3739
PrewarmQueries bool `mapstructure:"prewarm-queries"`
40+
ResultsFile string `mapstructure:"results-file"`
3841
}
3942

4043
// AddToFlagSet adds command line flags needed by the BenchmarkRunnerConfig to the flag set.
@@ -51,6 +54,7 @@ func (c BenchmarkRunnerConfig) AddToFlagSet(fs *pflag.FlagSet) {
5154
fs.Bool("print-responses", false, "Pretty print response bodies for correctness checking (default false).")
5255
fs.Int("debug", 0, "Whether to print debug messages.")
5356
fs.String("file", "", "File name to read queries from")
57+
fs.String("results-file", "", "Write the test results summary json to this file")
5458
}
5559

5660
// BenchmarkRunner contains the common components for running a query benchmarking
@@ -183,6 +187,33 @@ func (b *BenchmarkRunner) Run(queryPool *sync.Pool, processorCreateFn ProcessorC
183187
pprof.WriteHeapProfile(f)
184188
f.Close()
185189
}
190+
191+
// (Optional) save the results file:
192+
if len(b.BenchmarkRunnerConfig.ResultsFile) > 0 {
193+
b.saveTestResult(wallTook, wallStart, wallEnd)
194+
}
195+
}
196+
197+
func (b *BenchmarkRunner) saveTestResult(took time.Duration, start time.Time, end time.Time) {
198+
testResult := LoaderTestResult{
199+
ResultFormatVersion: BenchmarkTestResultVersion,
200+
RunnerConfig: b.BenchmarkRunnerConfig,
201+
StartTime: start.UTC().Unix() * 1000,
202+
EndTime: end.UTC().Unix() * 1000,
203+
DurationMillis: took.Milliseconds(),
204+
Totals: b.sp.GetTotalsMap(),
205+
}
206+
207+
_, _ = fmt.Printf("Saving results json file to %s\n", b.BenchmarkRunnerConfig.ResultsFile)
208+
file, err := json.MarshalIndent(testResult, "", " ")
209+
if err != nil {
210+
log.Fatal(err)
211+
}
212+
213+
err = ioutil.WriteFile(b.BenchmarkRunnerConfig.ResultsFile, file, 0644)
214+
if err != nil {
215+
log.Fatal(err)
216+
}
186217
}
187218

188219
func (b *BenchmarkRunner) processorHandler(wg *sync.WaitGroup, rateLimiter *rate.Limiter, queryPool *sync.Pool, processor Processor, workerNum int) {

pkg/query/benchmarker_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,10 @@ func (m *mockStatProcessor) CloseAndWait() {
323323
m.closed = true
324324
m.wg.Done()
325325
}
326+
func (m *mockStatProcessor) GetTotalsMap() map[string]interface{} {
327+
totals := make(map[string]interface{})
328+
return totals
329+
}
326330

327331
type mockProcessor struct {
328332
processRes []*Stat

0 commit comments

Comments
 (0)