From 21ad4eeea9d9cae8fc55be2ad85e4f0602aca05b Mon Sep 17 00:00:00 2001 From: aminst Date: Wed, 28 Feb 2024 15:49:28 -0500 Subject: [PATCH] Write average latency to the output file --- pkg/client/client.go | 31 +++++++++++++++++++++---------- pkg/client/output.go | 12 +++++++++++- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 9ade246..b94bed3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,14 +18,16 @@ import ( ) type ReadResponse struct { - block string - value string - err error + block string + value string + latency time.Duration + err error } type WriteResponse struct { block string success bool + latency time.Duration err error } @@ -63,30 +65,34 @@ func (c *client) WaitForStorageToBeReady(redisEndpoints []config.RedisEndpoint, func (c *client) asyncRead(block string, routerRPCClient RouterRPCClient, readResponseChannel chan ReadResponse) { c.rateLimit.Acquire() ctx, span := c.tracer.Start(context.Background(), "client read request") + startTime := time.Now() value, err := routerRPCClient.Read(ctx, block) + latency := time.Since(startTime) log.Debug().Msgf("Got value %s for block %s", value, block) span.End() c.rateLimit.Release() if err != nil { readResponseChannel <- ReadResponse{block: block, value: "", err: fmt.Errorf("failed to call Read block %s on router; %v", block, err)} } else if value == "" { - readResponseChannel <- ReadResponse{block: block, value: "", err: nil} + readResponseChannel <- ReadResponse{block: block, value: "", latency: latency, err: nil} } else { - readResponseChannel <- ReadResponse{block: block, value: value, err: nil} + readResponseChannel <- ReadResponse{block: block, value: value, latency: latency, err: nil} } } func (c *client) asyncWrite(block string, newValue string, routerRPCClient RouterRPCClient, writeResponseChannel chan WriteResponse) { c.rateLimit.Acquire() ctx, span := c.tracer.Start(context.Background(), "client write request") + startTime := time.Now() value, err := routerRPCClient.Write(ctx, block, newValue) + latency := time.Since(startTime) log.Debug().Msgf("Got success %v for block %s", value, block) span.End() c.rateLimit.Release() if err != nil { writeResponseChannel <- WriteResponse{block: block, success: false, err: fmt.Errorf("failed to call Write block %s on router; %v", block, err)} } else { - writeResponseChannel <- WriteResponse{block: block, success: value, err: nil} + writeResponseChannel <- WriteResponse{block: block, success: value, latency: latency, err: nil} } } @@ -109,24 +115,27 @@ func (c *client) SendRequestsForever(ctx context.Context, readResponseChannel ch } } -type ResponseCount struct { +type ResponseStatus struct { readOperations int writeOperations int + latencies []time.Duration } // getResponsesForever cancels remaining operations and returns when the context is cancelled // returns the number of read and write operations over fixed intervals in the duration -func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel chan ReadResponse, writeResponseChannel chan WriteResponse) []ResponseCount { +func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel chan ReadResponse, writeResponseChannel chan WriteResponse) []ResponseStatus { readOperations, writeOperations := 0, 0 - var responseCounts []ResponseCount + var latencies []time.Duration + var responseCounts []ResponseStatus timout := time.After(1 * time.Second) for { select { case <-ctx.Done(): return responseCounts case <-timout: - responseCounts = append(responseCounts, ResponseCount{readOperations, writeOperations}) + responseCounts = append(responseCounts, ResponseStatus{readOperations, writeOperations, latencies}) readOperations, writeOperations = 0, 0 + latencies = nil timout = time.After(1 * time.Second) default: } @@ -138,6 +147,7 @@ func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel ch } else { log.Debug().Msgf("Sucess in Read of block %s. Got value: %v\n", readResponse.block, readResponse.value) readOperations++ + latencies = append(latencies, readResponse.latency) } case writeResponse := <-writeResponseChannel: if writeResponse.err != nil { @@ -146,6 +156,7 @@ func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel ch } else { log.Debug().Msgf("Finished writing block %s. Success: %v\n", writeResponse.block, writeResponse.success) writeOperations++ + latencies = append(latencies, writeResponse.latency) } default: } diff --git a/pkg/client/output.go b/pkg/client/output.go index a480502..9e86998 100644 --- a/pkg/client/output.go +++ b/pkg/client/output.go @@ -5,18 +5,28 @@ import ( "os" ) -func WriteOutputToFile(outputFilePath string, responseCount []ResponseCount) error { +func WriteOutputToFile(outputFilePath string, responseCount []ResponseStatus) error { file, err := os.Create(outputFilePath) if err != nil { return err } defer file.Close() sum := 0.0 + experimentAverageLatency := 0.0 + // averageLatency := 0.0 for _, count := range responseCount { throughput := float64(count.readOperations + count.writeOperations) + averageLatency := 0.0 + for _, latency := range count.latencies { + averageLatency += float64(latency.Milliseconds()) + } + averageLatency = averageLatency / float64(len(count.latencies)) sum += throughput + experimentAverageLatency += averageLatency file.WriteString(fmt.Sprintf("Throughput: %f\n", throughput)) + file.WriteString(fmt.Sprintf("Average Latency: %f\n", averageLatency)) } file.WriteString(fmt.Sprintf("Average Throughput: %f\n", sum/float64(len(responseCount)))) + file.WriteString(fmt.Sprintf("Experiment Average Latency: %f\n", experimentAverageLatency/float64(len(responseCount)))) return nil }