Skip to content

Commit

Permalink
Fix storage WriteBucket bug
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Jan 30, 2024
1 parent 37dbb0a commit 038c406
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
10 changes: 7 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ func (s *StorageHandler) BatchWriteBucket(storageID int, readBucketBlocksList ma
ctx := context.Background()
dataResults := make(map[int]*redis.BoolCmd)
metadataResults := make(map[int]*redis.BoolCmd)
writtenBlocks = make(map[string]string)

log.Debug().Msgf("buckets from readBucketBlocksList: %v", readBucketBlocksList)
log.Debug().Msgf("shardNodeBlocks: %v", shardNodeBlocks)

for bucketID, readBucketBlocks := range readBucketBlocksList {
values := make([]string, s.Z+s.S)
metadatas := make([]string, s.Z+s.S)
Expand All @@ -239,13 +244,12 @@ func (s *StorageHandler) BatchWriteBucket(storageID int, readBucketBlocksList ma
realIndex[k] = k
}
shuffleArray(realIndex)
writtenBlocks = make(map[string]string)
i := 0
for key, value := range readBucketBlocks {
if strings.HasPrefix(key, "dummy") {
continue
}
if len(writtenBlocks) < s.Z {
if i < s.Z {
writtenBlocks[key] = value
values[realIndex[i]], err = Encrypt(value, s.key)
if err != nil {
Expand All @@ -262,7 +266,7 @@ func (s *StorageHandler) BatchWriteBucket(storageID int, readBucketBlocksList ma
if strings.HasPrefix(key, "dummy") {
continue
}
if len(writtenBlocks) < s.Z {
if i < s.Z {
writtenBlocks[key] = value
values[realIndex[i]], err = Encrypt(value, s.key)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,26 @@ func TestBatchGetBlockOffset(t *testing.T) {
}
}

func TestBatchReadBucketReturnsBlocksInAllBuckets(t *testing.T) {
s := NewStorageHandler(4, 1, 9, 1, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}})
s.InitDatabase()
toWriteBlocks := map[int]map[string]string{1: {"usr1": "value1"}, 2: {"usr2": "value2"}, 3: {"usr3": "value3"}, 4: {"usr4": "value4"}, 5: {"usr5": "value5"}}
s.BatchWriteBucket(0, toWriteBlocks, map[string]string{})
blocks, err := s.BatchReadBucket([]int{1, 2, 3, 4, 5}, 0)
if err != nil {
t.Errorf("error reading bucket")
}
expectedReadBuckets := toWriteBlocks
log.Debug().Msgf("blocks: %v", expectedReadBuckets)
for bucketID, blockToVal := range expectedReadBuckets {
for block, val := range blockToVal {
if blocks[bucketID][block] != val {
t.Errorf("expected %s, but got %s", val, blocks[bucketID][block])
}
}
}
}

func TestRandom(t *testing.T) {
s := NewStorageHandler(4, 1, 9, 1, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}})
s.InitDatabase()
Expand Down

0 comments on commit 038c406

Please sign in to comment.