From 7a84cbb4520a180eb1019565957251500e7e3041 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 10 Mar 2024 11:39:46 +0100 Subject: [PATCH] Improve watch latency benchmark * Support prevKV * Support multiple watchers per stream * Allow continious puts without waiting for event Signed-off-by: Marek Siarkowicz --- tools/benchmark/cmd/watch_latency.go | 122 ++++++++++++++++++--------- 1 file changed, 82 insertions(+), 40 deletions(-) diff --git a/tools/benchmark/cmd/watch_latency.go b/tools/benchmark/cmd/watch_latency.go index b51cb0752fc..67db42755a7 100644 --- a/tools/benchmark/cmd/watch_latency.go +++ b/tools/benchmark/cmd/watch_latency.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "os" - "sync" "time" "github.com/cheggaaa/pb/v3" @@ -40,15 +39,22 @@ var watchLatencyCmd = &cobra.Command{ } var ( - watchLTotal int - watchLPutRate int - watchLKeySize int - watchLValueSize int + watchLPutTotal int + watchLPutRate int + watchLKeySize int + watchLValueSize int + watchLStreams int + watchLWatchersPerStream int + watchLPrevKV bool ) func init() { RootCmd.AddCommand(watchLatencyCmd) - watchLatencyCmd.Flags().IntVar(&watchLTotal, "total", 10000, "Total number of put requests") + watchLatencyCmd.Flags().IntVar(&watchLStreams, "streams", 10, "Total watch streams") + watchLatencyCmd.Flags().IntVar(&watchLWatchersPerStream, "watchers-per-stream", 10, "Total watchers per stream") + watchLatencyCmd.Flags().BoolVar(&watchLPrevKV, "prevkv", false, "PrevKV enabled on watch requests") + + watchLatencyCmd.Flags().IntVar(&watchLPutTotal, "put-total", 1000, "Total number of put requests") watchLatencyCmd.Flags().IntVar(&watchLPutRate, "put-rate", 100, "Number of keys to put per second") watchLatencyCmd.Flags().IntVar(&watchLKeySize, "key-size", 32, "Key size of watch response") watchLatencyCmd.Flags().IntVar(&watchLValueSize, "val-size", 32, "Value size of watch response") @@ -57,54 +63,90 @@ func init() { func watchLatencyFunc(_ *cobra.Command, _ []string) { key := string(mustRandBytes(watchLKeySize)) value := string(mustRandBytes(watchLValueSize)) - - clients := mustCreateClients(totalClients, totalConns) + wchs := setupWatchChannels(key) putClient := mustCreateConn() - wchs := make([]clientv3.WatchChan, len(clients)) - for i := range wchs { - wchs[i] = clients[i].Watch(context.TODO(), key) - } - - bar = pb.New(watchLTotal) + bar = pb.New(watchLPutTotal * len(wchs)) bar.Start() limiter := rate.NewLimiter(rate.Limit(watchLPutRate), watchLPutRate) - r := newReport() - rc := r.Run() - for i := 0; i < watchLTotal; i++ { + putTimes := make([]time.Time, watchLPutTotal) + eventTimes := make([][]time.Time, len(wchs)) + + for i, wch := range wchs { + wch := wch + i := i + eventTimes[i] = make([]time.Time, watchLPutTotal) + wg.Add(1) + go func() { + defer wg.Done() + eventCount := 0 + for eventCount < watchLPutTotal { + resp := <-wch + for range resp.Events { + eventTimes[i][eventCount] = time.Now() + eventCount++ + bar.Increment() + } + } + }() + } + + putReport := newReport() + putReportResults := putReport.Run() + watchReport := newReport() + watchReportResults := watchReport.Run() + for i := 0; i < watchLPutTotal; i++ { // limit key put as per reqRate if err := limiter.Wait(context.TODO()); err != nil { break } - - var st time.Time - var wg sync.WaitGroup - wg.Add(len(clients)) - barrierc := make(chan struct{}) - for _, wch := range wchs { - ch := wch - go func() { - <-barrierc - <-ch - r.Results() <- report.Result{Start: st, End: time.Now()} - wg.Done() - }() - } - + start := time.Now() if _, err := putClient.Put(context.TODO(), key, value); err != nil { fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err) os.Exit(1) } - - st = time.Now() - close(barrierc) - wg.Wait() - bar.Increment() + end := time.Now() + putReport.Results() <- report.Result{Start: start, End: end} + putTimes[i] = end } - - close(r.Results()) + wg.Wait() + close(putReport.Results()) bar.Finish() - fmt.Printf("%s", <-rc) + fmt.Printf("\nPut summary:\n%s", <-putReportResults) + + for i := 0; i < len(wchs); i++ { + for j := 0; j < watchLPutTotal; j++ { + start := putTimes[j] + end := eventTimes[i][j] + if end.Before(start) { + start = end + } + watchReport.Results() <- report.Result{Start: start, End: end} + } + } + + close(watchReport.Results()) + fmt.Printf("\nWatch events summary:\n%s", <-watchReportResults) +} + +func setupWatchChannels(key string) []clientv3.WatchChan { + clients := mustCreateClients(totalClients, totalConns) + + streams := make([]clientv3.Watcher, watchLStreams) + for i := range streams { + streams[i] = clientv3.NewWatcher(clients[i%len(clients)]) + } + opts := []clientv3.OpOption{} + if watchLPrevKV { + opts = append(opts, clientv3.WithPrevKV()) + } + wchs := make([]clientv3.WatchChan, len(streams)*watchLWatchersPerStream) + for i := 0; i < len(streams); i++ { + for j := 0; j < watchLWatchersPerStream; j++ { + wchs[i*len(streams)+j] = streams[i].Watch(context.TODO(), key, opts...) + } + } + return wchs }