From 24d7e2eb5cc8e747ba9d974c37fd3518e3c92b89 Mon Sep 17 00:00:00 2001 From: aminst Date: Wed, 6 Dec 2023 12:35:21 -0500 Subject: [PATCH] Make readPathCounter atomic --- pkg/oramnode/server.go | 26 ++++++++------------------ pkg/oramnode/server_test.go | 6 ++---- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/pkg/oramnode/server.go b/pkg/oramnode/server.go index b23f7e7..f9083a0 100644 --- a/pkg/oramnode/server.go +++ b/pkg/oramnode/server.go @@ -5,7 +5,7 @@ import ( "fmt" "net" "strconv" - "sync" + "sync/atomic" "time" pb "github.com/dsg-uwaterloo/oblishard/api/oramnode" @@ -41,8 +41,7 @@ type oramNodeServer struct { raftNode *raft.Raft oramNodeFSM *oramNodeFSM shardNodeRPCClients map[int]ReplicaRPCClientMap - readPathCounter int - readPathCounterMu sync.Mutex + readPathCounter atomic.Int32 storageHandler storage parameters config.Parameters } @@ -54,7 +53,7 @@ func newOramNodeServer(oramNodeServerID int, replicaID int, raftNode *raft.Raft, raftNode: raftNode, oramNodeFSM: oramNodeFSM, shardNodeRPCClients: shardNodeRPCClients, - readPathCounter: 0, + readPathCounter: atomic.Int32{}, storageHandler: storageHandler, parameters: parameters, } @@ -256,9 +255,7 @@ func (o *oramNodeServer) evict(storageID int) error { return fmt.Errorf("could not apply log to the FSM; %s", err) } - o.readPathCounterMu.Lock() - o.readPathCounter = 0 - o.readPathCounterMu.Unlock() + o.readPathCounter.Store(0) return nil } @@ -370,6 +367,7 @@ func (o *oramNodeServer) ReadPath(ctx context.Context, request *pb.ReadPathReque for i := 0; i < realReadCount; i++ { response := <-readBlockResponseChan if response.err != nil { + log.Error().Msgf("Could not read block %s; %s", response.block, response.err) return nil, err } returnValues[response.block] = response.value @@ -384,9 +382,7 @@ func (o *oramNodeServer) ReadPath(ctx context.Context, request *pb.ReadPathReque } earlyReshuffleSpan.End() - o.readPathCounterMu.Lock() - o.readPathCounter++ - o.readPathCounterMu.Unlock() + o.readPathCounter.Add(1) endReadPathCommand, err := newReplicateEndReadPathCommand() if err != nil { @@ -471,14 +467,8 @@ func StartServer(oramNodeServerID int, ip string, rpcPort int, replicaID int, ra oramNodeServer := newOramNodeServer(oramNodeServerID, replicaID, r, oramNodeFSM, shardNodeRPCClients, storageHandler, parameters) go func() { for { - time.Sleep(200 * time.Millisecond) - needEviction := false - oramNodeServer.readPathCounterMu.Lock() - if oramNodeServer.readPathCounter >= oramNodeServer.parameters.EvictionRate { - needEviction = true - } - oramNodeServer.readPathCounterMu.Unlock() - if needEviction { + time.Sleep(100 * time.Millisecond) + if oramNodeServer.readPathCounter.Load() >= int32(oramNodeServer.parameters.EvictionRate) { storageID := oramNodeServer.storageHandler.GetRandomStorageID() oramNodeServer.evict(storageID) } diff --git a/pkg/oramnode/server_test.go b/pkg/oramnode/server_test.go index a652192..20180d8 100644 --- a/pkg/oramnode/server_test.go +++ b/pkg/oramnode/server_test.go @@ -341,9 +341,7 @@ func TestEvictKeepsBeginEvictionInFailureScenario(t *testing.T) { func TestEvictResetsReadPathCounter(t *testing.T) { o := startLeaderRaftNodeServer(t) o.evict(0) - o.readPathCounterMu.Lock() - defer o.readPathCounterMu.Unlock() - if o.readPathCounter != 0 { + if o.readPathCounter.Load() != 0 { t.Errorf("Evict should reset readPathCounter after successful execution") } } @@ -377,7 +375,7 @@ func TestReadPathIncrementsReadPathCounter(t *testing.T) { o := startLeaderRaftNodeServer(t).withMockStorageHandler(newMockStorageHandler(4, 4)) ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) o.ReadPath(ctx, &oramnode.ReadPathRequest{StorageId: 2, Requests: []*oramnode.BlockRequest{{Block: "a", Path: 1}}}) - if o.readPathCounter != 1 { + if o.readPathCounter.Load() != 1 { t.Errorf("ReadPath should increment readPathCounter") } }