Skip to content

Commit

Permalink
Optimize inflightInstanceTracker (#498)
Browse files Browse the repository at this point in the history
* Optimize inflightInstanceTracker

Signed-off-by: Marco Pracucci <[email protected]>

* Update CHANGELOG

Signed-off-by: Marco Pracucci <[email protected]>

* Use range to iterate

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored and simonswine committed Aug 7, 2024
1 parent 3ba30e2 commit 1c36612
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
* [FEATURE] Add `middleware.HTTPGRPCTracer` for more detailed server-side tracing spans and tags on `httpgrpc.HTTP/Handle` requests
* [FEATURE] Server: Add support for `GrpcInflightMethodLimiter` -- limiting gRPC requests before reading full request into the memory. This can be used to implement global or method-specific inflight limits for gRPC methods. #377 #392
* [FEATURE] Server: Add `-grpc.server.num-workers` flag that configures the `grpc.NumStreamWorkers()` option. This can be used to start a fixed base amount of workers to process gRPC requests and avoid stack allocation for each call. #400
* [FEATURE] Add `PartitionRing`. The partitions ring is hash ring to shard data between partitions. #474 #476 #478 #479 #481 #483 #484 #485 #488 #489 #493 #496
* [FEATURE] Add `PartitionRing`. The partitions ring is hash ring to shard data between partitions. #474 #476 #478 #479 #481 #483 #484 #485 #488 #489 #493 #496 #498
* [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477
* [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486
* [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495
Expand Down
25 changes: 20 additions & 5 deletions ring/replication_set_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,17 +469,17 @@ func (t *zoneAwareContextTracker) cancelAllContexts(cause error) {

type inflightInstanceTracker struct {
mx sync.Mutex
inflight []map[*InstanceDesc]struct{}
inflight [][]*InstanceDesc

// expectMoreInstances is true if more instances are expected to be added to the tracker.
expectMoreInstances bool
}

func newInflightInstanceTracker(sets []ReplicationSet) *inflightInstanceTracker {
// Init the inflight tracker.
inflight := make([]map[*InstanceDesc]struct{}, len(sets))
inflight := make([][]*InstanceDesc, len(sets))
for idx, set := range sets {
inflight[idx] = make(map[*InstanceDesc]struct{}, len(set.Instances))
inflight[idx] = make([]*InstanceDesc, 0, len(set.Instances))
}

return &inflightInstanceTracker{
Expand All @@ -495,7 +495,14 @@ func (t *inflightInstanceTracker) addInstance(replicationSetIdx int, instance *I
t.mx.Lock()
defer t.mx.Unlock()

t.inflight[replicationSetIdx][instance] = struct{}{}
// Check if the instance has already been added.
for _, curr := range t.inflight[replicationSetIdx] {
if curr == instance {
return
}
}

t.inflight[replicationSetIdx] = append(t.inflight[replicationSetIdx], instance)
}

// removeInstance removes the instance for replicationSetIdx from the tracker.
Expand All @@ -505,7 +512,15 @@ func (t *inflightInstanceTracker) removeInstance(replicationSetIdx int, instance
t.mx.Lock()
defer t.mx.Unlock()

delete(t.inflight[replicationSetIdx], instance)
for i, curr := range t.inflight[replicationSetIdx] {
if curr == instance {
instances := t.inflight[replicationSetIdx]
t.inflight[replicationSetIdx] = append(instances[:i], instances[i+1:]...)

// We can safely break the loop because we don't expect multiple occurrences of the same instance.
return
}
}
}

// allInstancesAdded signals the tracker that all expected instances have been added.
Expand Down

0 comments on commit 1c36612

Please sign in to comment.