Skip to content

Commit

Permalink
Remove extra TODO comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Oct 24, 2023
1 parent c0801e7 commit 929a39b
Show file tree
Hide file tree
Showing 7 changed files with 3 additions and 18 deletions.
2 changes: 0 additions & 2 deletions cmd/oramnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func main() {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}

// TODO: add a replica id to this
// TODO: read the exporter url from a config file or sth like that
tracingProvider, err := tracing.NewProvider(context.Background(), "oramnode", "localhost:4317")
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand Down
2 changes: 0 additions & 2 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func main() {
log.Fatal().Msgf("Failed to create client connections with shard node servers; %v", err)
}

// TODO: add a replica id to this
// TODO: read the exporter url from a config file or sth like that
tracingProvider, err := tracing.NewProvider(context.Background(), "router", "localhost:4317")
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand Down
2 changes: 0 additions & 2 deletions cmd/shardnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ func main() {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}

// TODO: add a replica id to this
// TODO: read the exporter url from a config file or sth like that
tracingProvider, err := tracing.NewProvider(context.Background(), "shardnode", "localhost:4317")
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand Down
1 change: 0 additions & 1 deletion pkg/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func startTestSystem() {
time.Sleep(4 * time.Second) // This is a bad of way of ensuring the leader is elected
go startOramNode(1, 8752, 1416, "om-data-replicaid-1", "127.0.0.1:8751")
go startOramNode(2, 8753, 1417, "om-data-replicaid-2", "127.0.0.1:8751")
// redis?
// TODO: kill the go routines, maybe by using cancel contexts
}

Expand Down
1 change: 0 additions & 1 deletion pkg/oramnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (o *oramNodeServer) earlyReshuffle(buckets []int, storageID int) error {
log.Debug().Msgf("Performing early reshuffle with buckets %v and storageID %d", buckets, storageID)
// TODO: can we make this a background thread?
for _, bucket := range buckets {
// TODO: check the Redis latency
accessCount, err := o.storageHandler.GetAccessCount(bucket, storageID)
if err != nil {
return fmt.Errorf("unable to get access count from the server; %s", err)
Expand Down
9 changes: 3 additions & 6 deletions pkg/shardnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,15 @@ func (s *shardNodeServer) sendBatchesForever() {
}
}

// TODO: fix the bug where it sends more than batch size sometimes

// For queues that have equal or more than batchSize requests, it sends a batch of size=BatchSize and waits for the responses.
// The logic here assumes that there are no duplicate blocks in the requests (which is fine since we only send a real request for the first one).
// It will not work otherwise because it will delete the response channel for a block after getting the first response.
func (s *shardNodeServer) sendCurrentBatches() {
s.batchManager.mu.Lock() //TODO: don't lock the whole thing, it will prevent concurrent batch sends
s.batchManager.mu.Lock() // TODO: don't lock the whole thing, it will prevent concurrent batch sends
defer s.batchManager.mu.Unlock()
for storageID, requests := range s.batchManager.storageQueues {
if len(requests) >= s.batchManager.batchSize {
oramNodeReplicaMap := s.oramNodeClients.getRandomOramNodeReplicaMap()
// TODO: add a note about why we are using the first request's context
log.Debug().Msgf("Sending batch of size %d to storageID %d", len(requests), storageID)
reply, err := oramNodeReplicaMap.readPathFromAllOramNodeReplicas(requests[0].ctx, requests, storageID)
s.batchManager.deleteRequestsFromQueue(storageID, s.batchManager.batchSize)
Expand Down Expand Up @@ -133,7 +130,7 @@ func (s *shardNodeServer) query(ctx context.Context, op OperationType, block str
if err != nil {
return "", fmt.Errorf("could not create request replication command; %s", err)
}
_, requestReplicationSpan := tracer.Start(ctx, "apply request replication") // TODO: should I update the context?
_, requestReplicationSpan := tracer.Start(ctx, "apply request replication")
err = s.raftNode.Apply(requestReplicationCommand, 2*time.Second).Error()
requestReplicationSpan.End()
if err != nil {
Expand All @@ -143,7 +140,7 @@ func (s *shardNodeServer) query(ctx context.Context, op OperationType, block str
blockToRequest, path, storageID := s.getWhatToSendBasedOnRequest(ctx, block, requestID)

var replyValue string
_, waitOnReplySpan := tracer.Start(ctx, "wait on reply") // TODO: should I update the context?
_, waitOnReplySpan := tracer.Start(ctx, "wait on reply")

log.Debug().Msgf("Adding request to storage queue and waiting for block %s", block)
oramReplyChan := s.batchManager.addRequestToStorageQueueAndWait(blockRequest{ctx: ctx, block: blockToRequest, path: path}, storageID)
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (s *StorageHandler) GetRandomPathAndStorageID(ctx context.Context) (path in
// If non of the "blocks" are in the bucket, it returns isReal=false
func (s *StorageHandler) GetBlockOffset(bucketID int, storageID int, blocks []string) (offset int, isReal bool, blockFound string, err error) {
log.Debug().Msgf("Getting block offset for bucket %d and storage %d", bucketID, storageID)
// TODO: implement
blockMap := make(map[string]int)
for i := 0; i < Z; i++ {
pos, key, err := s.GetMetadata(bucketID, strconv.Itoa(i), storageID)
Expand Down Expand Up @@ -120,7 +119,6 @@ func (s *StorageHandler) GetAccessCount(bucketID int, storageID int) (count int,
// blocks is a map of block id to block values.
func (s *StorageHandler) ReadBucket(bucketID int, storageID int) (blocks map[string]string, err error) {
log.Debug().Msgf("Reading bucket %d from storage %d", bucketID, storageID)
// TODO: implement
client := s.getClient(storageID)
ctx := context.Background()
blocks = make(map[string]string)
Expand Down Expand Up @@ -153,7 +151,6 @@ func (s *StorageHandler) ReadBucket(bucketID int, storageID int) (blocks map[str
// It returns the blocks that were written into the storage shard in the writtenBlocks variable.
func (s *StorageHandler) WriteBucket(bucketID int, storageID int, readBucketBlocks map[string]string, shardNodeBlocks map[string]string, isAtomic bool) (writtenBlocks map[string]string, err error) {
log.Debug().Msgf("Writing bucket %d to storage %d", bucketID, storageID)
// TODO: implement
// TODO: It should make the counter zero
values := make([]string, Z+S)
metadatas := make([]string, Z+S)
Expand Down Expand Up @@ -218,7 +215,6 @@ func (s *StorageHandler) WriteBucket(bucketID int, storageID int, readBucketBloc
// ReadBlock reads a single block using an its offset.
func (s *StorageHandler) ReadBlock(bucketID int, storageID int, offset int) (value string, err error) {
log.Debug().Msgf("Reading block %d from bucket %d in storage %d", offset, bucketID, storageID)
// TODO: it should invalidate and increase counter
client := s.getClient(storageID)
ctx := context.Background()
value, err = client.HGet(ctx, strconv.Itoa(bucketID), strconv.Itoa(offset)).Result()
Expand Down

0 comments on commit 929a39b

Please sign in to comment.