Skip to content

Commit

Permalink
Make readPathCounter atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Dec 6, 2023
1 parent 6917f19 commit 24d7e2e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 22 deletions.
26 changes: 8 additions & 18 deletions pkg/oramnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"sync/atomic"
"time"

pb "github.com/dsg-uwaterloo/oblishard/api/oramnode"
Expand Down Expand Up @@ -41,8 +41,7 @@ type oramNodeServer struct {
raftNode *raft.Raft
oramNodeFSM *oramNodeFSM
shardNodeRPCClients map[int]ReplicaRPCClientMap
readPathCounter int
readPathCounterMu sync.Mutex
readPathCounter atomic.Int32
storageHandler storage
parameters config.Parameters
}
Expand All @@ -54,7 +53,7 @@ func newOramNodeServer(oramNodeServerID int, replicaID int, raftNode *raft.Raft,
raftNode: raftNode,
oramNodeFSM: oramNodeFSM,
shardNodeRPCClients: shardNodeRPCClients,
readPathCounter: 0,
readPathCounter: atomic.Int32{},
storageHandler: storageHandler,
parameters: parameters,
}
Expand Down Expand Up @@ -256,9 +255,7 @@ func (o *oramNodeServer) evict(storageID int) error {
return fmt.Errorf("could not apply log to the FSM; %s", err)
}

o.readPathCounterMu.Lock()
o.readPathCounter = 0
o.readPathCounterMu.Unlock()
o.readPathCounter.Store(0)

return nil
}
Expand Down Expand Up @@ -370,6 +367,7 @@ func (o *oramNodeServer) ReadPath(ctx context.Context, request *pb.ReadPathReque
for i := 0; i < realReadCount; i++ {
response := <-readBlockResponseChan
if response.err != nil {
log.Error().Msgf("Could not read block %s; %s", response.block, response.err)
return nil, err
}
returnValues[response.block] = response.value
Expand All @@ -384,9 +382,7 @@ func (o *oramNodeServer) ReadPath(ctx context.Context, request *pb.ReadPathReque
}
earlyReshuffleSpan.End()

o.readPathCounterMu.Lock()
o.readPathCounter++
o.readPathCounterMu.Unlock()
o.readPathCounter.Add(1)

endReadPathCommand, err := newReplicateEndReadPathCommand()
if err != nil {
Expand Down Expand Up @@ -471,14 +467,8 @@ func StartServer(oramNodeServerID int, ip string, rpcPort int, replicaID int, ra
oramNodeServer := newOramNodeServer(oramNodeServerID, replicaID, r, oramNodeFSM, shardNodeRPCClients, storageHandler, parameters)
go func() {
for {
time.Sleep(200 * time.Millisecond)
needEviction := false
oramNodeServer.readPathCounterMu.Lock()
if oramNodeServer.readPathCounter >= oramNodeServer.parameters.EvictionRate {
needEviction = true
}
oramNodeServer.readPathCounterMu.Unlock()
if needEviction {
time.Sleep(100 * time.Millisecond)
if oramNodeServer.readPathCounter.Load() >= int32(oramNodeServer.parameters.EvictionRate) {
storageID := oramNodeServer.storageHandler.GetRandomStorageID()
oramNodeServer.evict(storageID)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/oramnode/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,7 @@ func TestEvictKeepsBeginEvictionInFailureScenario(t *testing.T) {
func TestEvictResetsReadPathCounter(t *testing.T) {
o := startLeaderRaftNodeServer(t)
o.evict(0)
o.readPathCounterMu.Lock()
defer o.readPathCounterMu.Unlock()
if o.readPathCounter != 0 {
if o.readPathCounter.Load() != 0 {
t.Errorf("Evict should reset readPathCounter after successful execution")
}
}
Expand Down Expand Up @@ -377,7 +375,7 @@ func TestReadPathIncrementsReadPathCounter(t *testing.T) {
o := startLeaderRaftNodeServer(t).withMockStorageHandler(newMockStorageHandler(4, 4))
ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1"))
o.ReadPath(ctx, &oramnode.ReadPathRequest{StorageId: 2, Requests: []*oramnode.BlockRequest{{Block: "a", Path: 1}}})
if o.readPathCounter != 1 {
if o.readPathCounter.Load() != 1 {
t.Errorf("ReadPath should increment readPathCounter")
}
}

0 comments on commit 24d7e2e

Please sign in to comment.