Skip to content

Commit

Permalink
Add rate limitter to the client
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Nov 23, 2023
1 parent 83d4207 commit 0e9c5fa
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 18 deletions.
19 changes: 14 additions & 5 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ type writeResponse struct {
err error
}

func asyncRead(tracer trace.Tracer, block string, routerRPCClient client.RouterRPCClient, readResponseChannel chan readResponse) {
// TODO: Add client struct that contains the routerRPCClient and the rateLimit
func asyncRead(ratelimit *client.RateLimit, tracer trace.Tracer, block string, routerRPCClient client.RouterRPCClient, readResponseChannel chan readResponse) {
ratelimit.Wait()
ctx, span := tracer.Start(context.Background(), "client read request")
value, err := routerRPCClient.Read(ctx, block)
ratelimit.AddToken()
span.End()
if err != nil {
readResponseChannel <- readResponse{block: block, value: "", err: fmt.Errorf("failed to call Read block %s on router; %v", block, err)}
Expand All @@ -41,9 +44,11 @@ func asyncRead(tracer trace.Tracer, block string, routerRPCClient client.RouterR
}
}

func asyncWrite(tracer trace.Tracer, block string, newValue string, routerRPCClient client.RouterRPCClient, writeResponseChannel chan writeResponse) {
func asyncWrite(ratelimit *client.RateLimit, tracer trace.Tracer, block string, newValue string, routerRPCClient client.RouterRPCClient, writeResponseChannel chan writeResponse) {
ratelimit.Wait()
ctx, span := tracer.Start(context.Background(), "client write request")
value, err := routerRPCClient.Write(ctx, block, newValue)
ratelimit.AddToken()
span.End()
if err != nil {
writeResponseChannel <- writeResponse{block: block, success: false, err: fmt.Errorf("failed to call Write block %s on router; %v", block, err)}
Expand All @@ -57,7 +62,7 @@ func main() {
logPath := flag.String("logpath", "", "path to write logs")
configsPath := flag.String("conf", "../../configs/default", "configs directory path")
flag.Parse()
utils.InitLogging(true, *logPath)
utils.InitLogging(false, *logPath)

routerEndpoints, err := config.ReadRouterEndpoints(path.Join(*configsPath, "router_endpoints.yaml"))
if err != nil {
Expand Down Expand Up @@ -97,26 +102,30 @@ func main() {
writeResponseChannel := make(chan writeResponse)
readOperations := 0
writeOperations := 0
rateLimit := client.NewRateLimit(parameters.MaxRequests)
rateLimit.Start()
startTime := time.Now()
for _, request := range requests {
if request.OperationType == client.Read {
readOperations++
go asyncRead(tracer, request.Block, routerRPCClient, readResponseChannel)
go asyncRead(rateLimit, tracer, request.Block, routerRPCClient, readResponseChannel)
} else if request.OperationType == client.Write {
writeOperations++
go asyncWrite(tracer, request.Block, request.NewValue, routerRPCClient, writeResponseChannel)
go asyncWrite(rateLimit, tracer, request.Block, request.NewValue, routerRPCClient, writeResponseChannel)
}
}
for i := 0; i < readOperations+writeOperations; i++ {
select {
case readResponse := <-readResponseChannel:
if readResponse.err != nil {
fmt.Println(readResponse.err.Error())
log.Error().Msgf(readResponse.err.Error())
} else {
log.Debug().Msgf("Sucess in Read of block %s. Got value: %v\n", readResponse.block, readResponse.value)
}
case writeResponse := <-writeResponseChannel:
if writeResponse.err != nil {
fmt.Println(writeResponse.err.Error())
log.Error().Msgf(writeResponse.err.Error())
} else {
log.Debug().Msgf("Finished writing block %s. Success: %v\n", writeResponse.block, writeResponse.success)
Expand Down
11 changes: 6 additions & 5 deletions configs/default/parameters.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
max-blocks-to-send: 200 # The maximum number of blocks to send from each shard node to the oram node during evictions
eviction-rate: 2 # How many ReadPath operations before eviction
evict-path-count: 5 # How many paths to evict at a time
batch-size: 1 # Size of each batch of requests
epoch-time: 10 # How many milliseconds between each epoch
eviction-rate: 1 # How many ReadPath operations before eviction
evict-path-count: 200 # How many paths to evict at a time
batch-timeout: 25 # How many milliseconds to wait before sending a batch of blocks to the oram node
epoch-time: 25 # How many milliseconds between each epoch
trace: false # Whether to use opentelemetry and jaeger
Z: 1 # number of real blocks per bucket
S: 9 # number of dummy blocks per bucket
shift: 1 # 2^shift is the tree branching factor
tree-height: 8 # height of the tree
tree-height: 11 # height of the tree
max-requests: 3000 # maximum number of requests in flight at the client
13 changes: 7 additions & 6 deletions configs/parameters.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
max-blocks-to-send: 5 # The maximum number of blocks to send from each shard node to the oram node during evictions
eviction-rate: 40 # How many ReadPath operations before eviction
evict-path-count: 5 # How many paths to evict at a time
batch-size: 2 # Size of each batch of requests
epoch-time: 10 # How many milliseconds between each epoch
max-blocks-to-send: 200 # The maximum number of blocks to send from each shard node to the oram node during evictions
eviction-rate: 5 # How many ReadPath operations before eviction
evict-path-count: 200 # How many paths to evict at a time
batch-timeout: 100 # How many milliseconds to wait before sending a batch of blocks to the oram node
epoch-time: 100 # How many milliseconds between each epoch
trace: false # Whether to use opentelemetry and jaeger
Z: 1 # number of real blocks per bucket
S: 9 # number of dummy blocks per bucket
shift: 1 # 2^shift is the tree branching factor
tree-height: 3 # height of the tree
tree-height: 11 # height of the tree
max-requests: 1500 # maximum number of requests in flight at the client
32 changes: 32 additions & 0 deletions pkg/client/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package client

// RateLimit is a simple rate limiter that allows a maximum of tokensLimit requests at a time.
type RateLimit struct {
// The number of requests that can be made in the current period.
tokensLimit int
availableTokens chan struct{}
}

func NewRateLimit(tokensLimit int) *RateLimit {
return &RateLimit{
tokensLimit: tokensLimit,
availableTokens: make(chan struct{}, tokensLimit),
}
}

// Clients should call Start() before making requests so that the rate limiter has tokens to give out.
func (r *RateLimit) Start() {
for i := 0; i < r.tokensLimit; i++ {
go func() { r.availableTokens <- struct{}{} }()
}
}

// Clients should call Wait() before making requests to ensure that the rate limiter has tokens to give out.
func (r *RateLimit) Wait() {
<-r.availableTokens
}

// Clients should call AddToken() after making requests to return a token to the rate limiter.
func (r *RateLimit) AddToken() {
go func() { r.availableTokens <- struct{}{} }()
}
52 changes: 52 additions & 0 deletions pkg/client/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package client

import (
"testing"
"time"
)

func TestRateLimitAddsTokensAfterCallingStart(t *testing.T) {
rateLimit := NewRateLimit(10)
rateLimit.Start()
timeout := time.After(1 * time.Second)
responseChan := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
rateLimit.Wait()
responseChan <- true
}()
}
for i := 0; i < 10; i++ {
select {
case <-responseChan:
case <-timeout:
t.Fatal("Timed out waiting for response")
}
}
}

func TestAddTokenAllowsBlockedClientsToContinue(t *testing.T) {
rateLimit := NewRateLimit(1)
rateLimit.Start()
timeout := time.After(1 * time.Second)
responseChan := make(chan bool)
go func() {
rateLimit.Wait()
responseChan <- true
}()
select {
case <-responseChan:
case <-timeout:
t.Fatal("Timed out waiting for response")
}
rateLimit.AddToken()
go func() {
rateLimit.Wait()
responseChan <- true
}()
select {
case <-responseChan:
case <-timeout:
t.Fatal("Timed out waiting for response")
}
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Parameters struct {
S int `yaml:"S"`
Shift int `yaml:"shift"`
TreeHeight int `yaml:"tree-height"`
MaxRequests int `yaml:"max-requests"`
}

func ReadRouterEndpoints(path string) ([]RouterEndpoint, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/e2e/configs/parameters.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
max-blocks-to-send: 5 # The maximum number of blocks to send from each shard node to the oram node during evictions
eviction-rate: 2 # How many ReadPath operations before eviction
evict-path-count: 4 # How many paths to evict at a time
batch-size: 1 # Size of each batch of requests
batch-timeout: 2 # How many milliseconds to wait before sending a batch of blocks to the oram node
epoch-time: 10 # How many milliseconds between each epoch
trace: false # Whether to use opentelemetry and jaeger
Z: 1 # number of real blocks per bucket
S: 9 # number of dummy blocks per bucket
shift: 1 # 2^shift is the tree branching factor
tree-height: 3 # height of the tree
tree-height: 3 # height of the tree
max-requests: 1000 # maximum number of requests in flight at the client

0 comments on commit 0e9c5fa

Please sign in to comment.