Skip to content

Commit

Permalink
Fix batch tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Nov 14, 2023
1 parent 1822351 commit c69c8e3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 39 deletions.
1 change: 0 additions & 1 deletion pkg/shardnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (s *shardNodeServer) sendBatchesForever() {
// The logic here assumes that there are no duplicate blocks in the requests (which is fine since we only send a real request for the first one).
// It will not work otherwise because it will delete the response channel for a block after getting the first response.
func (s *shardNodeServer) sendCurrentBatches() {
// TODO: don't lock the whole thing, it will prevent concurrent batch sends
storageQueues := make(map[int][]blockRequest)
responseChannels := make(map[string]chan string)
s.batchManager.mu.HighPriorityLock()
Expand Down
49 changes: 11 additions & 38 deletions pkg/shardnode/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ func startLeaderRaftNodeServer(t *testing.T, batchSize int, withBatchReponses bo
return s
}

func TestSendCurrentBatchesSendsQueuesExceedingBatchSizeRequests(t *testing.T) {
s := newShardNodeServer(0, 0, &raft.Raft{}, &shardNodeFSM{}, getMockOramNodeClientsWithBatchResponses(), 4, 5, newBatchManager(3))
s.batchManager.responseChannel["a"] = make(chan string)
func TestSendCurrentBatchesSendsQueuesAfterBatchTimeout(t *testing.T) {
s := newShardNodeServer(0, 0, &raft.Raft{}, &shardNodeFSM{}, getMockOramNodeClientsWithBatchResponses(), 4, 5, newBatchManager(1*time.Millisecond))
chA := make(chan string)
s.batchManager.responseChannel["a"] = chA
s.batchManager.storageQueues[1] = []blockRequest{{block: "a", path: 1}}
s.batchManager.responseChannel["b"] = make(chan string)
chB := make(chan string)
s.batchManager.responseChannel["b"] = chB
s.batchManager.storageQueues[1] = append(s.batchManager.storageQueues[1], blockRequest{block: "b", path: 1})
s.batchManager.responseChannel["c"] = make(chan string)
chC := make(chan string)
s.batchManager.responseChannel["c"] = chC
s.batchManager.storageQueues[1] = append(s.batchManager.storageQueues[1], blockRequest{block: "c", path: 1})
go s.sendCurrentBatches()
timout := time.After(3 * time.Second)
Expand All @@ -139,44 +142,14 @@ func TestSendCurrentBatchesSendsQueuesExceedingBatchSizeRequests(t *testing.T) {
case <-timout:
t.Errorf("the batches were not sent")
return
case <-s.batchManager.responseChannel["a"]:
receivedResponsesCount++
case <-s.batchManager.responseChannel["b"]:
receivedResponsesCount++
case <-s.batchManager.responseChannel["c"]:
case <-chA:
receivedResponsesCount++
}
}
}

func TestSendCurrentBatchesOnlySendsBatchSizeRequestsAtATime(t *testing.T) {
s := newShardNodeServer(0, 0, &raft.Raft{}, &shardNodeFSM{}, getMockOramNodeClientsWithBatchResponses(), 4, 5, newBatchManager(2))
s.batchManager.responseChannel["a"] = make(chan string)
s.batchManager.storageQueues[1] = []blockRequest{{block: "a", path: 1}}
s.batchManager.responseChannel["b"] = make(chan string)
s.batchManager.storageQueues[1] = append(s.batchManager.storageQueues[1], blockRequest{block: "b", path: 1})
s.batchManager.responseChannel["c"] = make(chan string)
s.batchManager.storageQueues[1] = append(s.batchManager.storageQueues[1], blockRequest{block: "c", path: 1})
go s.sendCurrentBatches()
timout := time.After(3 * time.Second)
receivedResponsesCount := 0
for {
if receivedResponsesCount == 2 {
break
}
select {
case <-timout:
t.Errorf("the batches were not sent")
return
case <-s.batchManager.responseChannel["a"]:
case <-chB:
receivedResponsesCount++
case <-s.batchManager.responseChannel["b"]:
case <-chC:
receivedResponsesCount++
}
}
if len(s.batchManager.storageQueues[1]) != 1 || s.batchManager.storageQueues[1][0].block != "c" {
t.Errorf("the remaining request should be c")
}
}

func TestSendCurrentBatchesRemovesSentQueueAndResponseChannel(t *testing.T) {
Expand Down

0 comments on commit c69c8e3

Please sign in to comment.