From 0e9c5faa406531594529aeb1d26c78d05144a275 Mon Sep 17 00:00:00 2001 From: aminst Date: Thu, 23 Nov 2023 17:39:07 -0500 Subject: [PATCH] Add rate limitter to the client --- cmd/client/main.go | 19 ++++++++---- configs/default/parameters.yaml | 11 +++---- configs/parameters.yaml | 13 +++++---- pkg/client/ratelimit.go | 32 ++++++++++++++++++++ pkg/client/ratelimit_test.go | 52 +++++++++++++++++++++++++++++++++ pkg/config/config.go | 1 + pkg/e2e/configs/parameters.yaml | 5 ++-- 7 files changed, 115 insertions(+), 18 deletions(-) create mode 100644 pkg/client/ratelimit.go create mode 100644 pkg/client/ratelimit_test.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 47b9703..b381a59 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -28,9 +28,12 @@ type writeResponse struct { err error } -func asyncRead(tracer trace.Tracer, block string, routerRPCClient client.RouterRPCClient, readResponseChannel chan readResponse) { +// TODO: Add client struct that contains the routerRPCClient and the rateLimit +func asyncRead(ratelimit *client.RateLimit, tracer trace.Tracer, block string, routerRPCClient client.RouterRPCClient, readResponseChannel chan readResponse) { + ratelimit.Wait() ctx, span := tracer.Start(context.Background(), "client read request") value, err := routerRPCClient.Read(ctx, block) + ratelimit.AddToken() span.End() if err != nil { readResponseChannel <- readResponse{block: block, value: "", err: fmt.Errorf("failed to call Read block %s on router; %v", block, err)} @@ -41,9 +44,11 @@ func asyncRead(tracer trace.Tracer, block string, routerRPCClient client.RouterR } } -func asyncWrite(tracer trace.Tracer, block string, newValue string, routerRPCClient client.RouterRPCClient, writeResponseChannel chan writeResponse) { +func asyncWrite(ratelimit *client.RateLimit, tracer trace.Tracer, block string, newValue string, routerRPCClient client.RouterRPCClient, writeResponseChannel chan writeResponse) { + ratelimit.Wait() ctx, span := tracer.Start(context.Background(), "client write request") value, err := routerRPCClient.Write(ctx, block, newValue) + ratelimit.AddToken() span.End() if err != nil { writeResponseChannel <- writeResponse{block: block, success: false, err: fmt.Errorf("failed to call Write block %s on router; %v", block, err)} @@ -57,7 +62,7 @@ func main() { logPath := flag.String("logpath", "", "path to write logs") configsPath := flag.String("conf", "../../configs/default", "configs directory path") flag.Parse() - utils.InitLogging(true, *logPath) + utils.InitLogging(false, *logPath) routerEndpoints, err := config.ReadRouterEndpoints(path.Join(*configsPath, "router_endpoints.yaml")) if err != nil { @@ -97,26 +102,30 @@ func main() { writeResponseChannel := make(chan writeResponse) readOperations := 0 writeOperations := 0 + rateLimit := client.NewRateLimit(parameters.MaxRequests) + rateLimit.Start() startTime := time.Now() for _, request := range requests { if request.OperationType == client.Read { readOperations++ - go asyncRead(tracer, request.Block, routerRPCClient, readResponseChannel) + go asyncRead(rateLimit, tracer, request.Block, routerRPCClient, readResponseChannel) } else if request.OperationType == client.Write { writeOperations++ - go asyncWrite(tracer, request.Block, request.NewValue, routerRPCClient, writeResponseChannel) + go asyncWrite(rateLimit, tracer, request.Block, request.NewValue, routerRPCClient, writeResponseChannel) } } for i := 0; i < readOperations+writeOperations; i++ { select { case readResponse := <-readResponseChannel: if readResponse.err != nil { + fmt.Println(readResponse.err.Error()) log.Error().Msgf(readResponse.err.Error()) } else { log.Debug().Msgf("Sucess in Read of block %s. Got value: %v\n", readResponse.block, readResponse.value) } case writeResponse := <-writeResponseChannel: if writeResponse.err != nil { + fmt.Println(writeResponse.err.Error()) log.Error().Msgf(writeResponse.err.Error()) } else { log.Debug().Msgf("Finished writing block %s. Success: %v\n", writeResponse.block, writeResponse.success) diff --git a/configs/default/parameters.yaml b/configs/default/parameters.yaml index f3dc227..0664e5d 100644 --- a/configs/default/parameters.yaml +++ b/configs/default/parameters.yaml @@ -1,10 +1,11 @@ max-blocks-to-send: 200 # The maximum number of blocks to send from each shard node to the oram node during evictions -eviction-rate: 2 # How many ReadPath operations before eviction -evict-path-count: 5 # How many paths to evict at a time -batch-size: 1 # Size of each batch of requests -epoch-time: 10 # How many milliseconds between each epoch +eviction-rate: 1 # How many ReadPath operations before eviction +evict-path-count: 200 # How many paths to evict at a time +batch-timeout: 25 # How many milliseconds to wait before sending a batch of blocks to the oram node +epoch-time: 25 # How many milliseconds between each epoch trace: false # Whether to use opentelemetry and jaeger Z: 1 # number of real blocks per bucket S: 9 # number of dummy blocks per bucket shift: 1 # 2^shift is the tree branching factor -tree-height: 8 # height of the tree \ No newline at end of file +tree-height: 11 # height of the tree +max-requests: 3000 # maximum number of requests in flight at the client \ No newline at end of file diff --git a/configs/parameters.yaml b/configs/parameters.yaml index f1010b9..31f2223 100644 --- a/configs/parameters.yaml +++ b/configs/parameters.yaml @@ -1,10 +1,11 @@ -max-blocks-to-send: 5 # The maximum number of blocks to send from each shard node to the oram node during evictions -eviction-rate: 40 # How many ReadPath operations before eviction -evict-path-count: 5 # How many paths to evict at a time -batch-size: 2 # Size of each batch of requests -epoch-time: 10 # How many milliseconds between each epoch +max-blocks-to-send: 200 # The maximum number of blocks to send from each shard node to the oram node during evictions +eviction-rate: 5 # How many ReadPath operations before eviction +evict-path-count: 200 # How many paths to evict at a time +batch-timeout: 100 # How many milliseconds to wait before sending a batch of blocks to the oram node +epoch-time: 100 # How many milliseconds between each epoch trace: false # Whether to use opentelemetry and jaeger Z: 1 # number of real blocks per bucket S: 9 # number of dummy blocks per bucket shift: 1 # 2^shift is the tree branching factor -tree-height: 3 # height of the tree \ No newline at end of file +tree-height: 11 # height of the tree +max-requests: 1500 # maximum number of requests in flight at the client \ No newline at end of file diff --git a/pkg/client/ratelimit.go b/pkg/client/ratelimit.go new file mode 100644 index 0000000..417f0db --- /dev/null +++ b/pkg/client/ratelimit.go @@ -0,0 +1,32 @@ +package client + +// RateLimit is a simple rate limiter that allows a maximum of tokensLimit requests at a time. +type RateLimit struct { + // The number of requests that can be made in the current period. + tokensLimit int + availableTokens chan struct{} +} + +func NewRateLimit(tokensLimit int) *RateLimit { + return &RateLimit{ + tokensLimit: tokensLimit, + availableTokens: make(chan struct{}, tokensLimit), + } +} + +// Clients should call Start() before making requests so that the rate limiter has tokens to give out. +func (r *RateLimit) Start() { + for i := 0; i < r.tokensLimit; i++ { + go func() { r.availableTokens <- struct{}{} }() + } +} + +// Clients should call Wait() before making requests to ensure that the rate limiter has tokens to give out. +func (r *RateLimit) Wait() { + <-r.availableTokens +} + +// Clients should call AddToken() after making requests to return a token to the rate limiter. +func (r *RateLimit) AddToken() { + go func() { r.availableTokens <- struct{}{} }() +} diff --git a/pkg/client/ratelimit_test.go b/pkg/client/ratelimit_test.go new file mode 100644 index 0000000..8a228d6 --- /dev/null +++ b/pkg/client/ratelimit_test.go @@ -0,0 +1,52 @@ +package client + +import ( + "testing" + "time" +) + +func TestRateLimitAddsTokensAfterCallingStart(t *testing.T) { + rateLimit := NewRateLimit(10) + rateLimit.Start() + timeout := time.After(1 * time.Second) + responseChan := make(chan bool) + for i := 0; i < 10; i++ { + go func() { + rateLimit.Wait() + responseChan <- true + }() + } + for i := 0; i < 10; i++ { + select { + case <-responseChan: + case <-timeout: + t.Fatal("Timed out waiting for response") + } + } +} + +func TestAddTokenAllowsBlockedClientsToContinue(t *testing.T) { + rateLimit := NewRateLimit(1) + rateLimit.Start() + timeout := time.After(1 * time.Second) + responseChan := make(chan bool) + go func() { + rateLimit.Wait() + responseChan <- true + }() + select { + case <-responseChan: + case <-timeout: + t.Fatal("Timed out waiting for response") + } + rateLimit.AddToken() + go func() { + rateLimit.Wait() + responseChan <- true + }() + select { + case <-responseChan: + case <-timeout: + t.Fatal("Timed out waiting for response") + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index ea4c4e0..f7a5df0 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -60,6 +60,7 @@ type Parameters struct { S int `yaml:"S"` Shift int `yaml:"shift"` TreeHeight int `yaml:"tree-height"` + MaxRequests int `yaml:"max-requests"` } func ReadRouterEndpoints(path string) ([]RouterEndpoint, error) { diff --git a/pkg/e2e/configs/parameters.yaml b/pkg/e2e/configs/parameters.yaml index bc9b284..97cb7b8 100644 --- a/pkg/e2e/configs/parameters.yaml +++ b/pkg/e2e/configs/parameters.yaml @@ -1,10 +1,11 @@ max-blocks-to-send: 5 # The maximum number of blocks to send from each shard node to the oram node during evictions eviction-rate: 2 # How many ReadPath operations before eviction evict-path-count: 4 # How many paths to evict at a time -batch-size: 1 # Size of each batch of requests +batch-timeout: 2 # How many milliseconds to wait before sending a batch of blocks to the oram node epoch-time: 10 # How many milliseconds between each epoch trace: false # Whether to use opentelemetry and jaeger Z: 1 # number of real blocks per bucket S: 9 # number of dummy blocks per bucket shift: 1 # 2^shift is the tree branching factor -tree-height: 3 # height of the tree \ No newline at end of file +tree-height: 3 # height of the tree +max-requests: 1000 # maximum number of requests in flight at the client \ No newline at end of file