From 1c366120a7ff617489032244476332f858bee8ed Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 23 Feb 2024 17:49:43 +0100 Subject: [PATCH] Optimize inflightInstanceTracker (#498) * Optimize inflightInstanceTracker Signed-off-by: Marco Pracucci * Update CHANGELOG Signed-off-by: Marco Pracucci * Use range to iterate Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- ring/replication_set_tracker.go | 25 ++++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 264eea3fe..bc0c21453 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,7 +76,7 @@ * [FEATURE] Add `middleware.HTTPGRPCTracer` for more detailed server-side tracing spans and tags on `httpgrpc.HTTP/Handle` requests * [FEATURE] Server: Add support for `GrpcInflightMethodLimiter` -- limiting gRPC requests before reading full request into the memory. This can be used to implement global or method-specific inflight limits for gRPC methods. #377 #392 * [FEATURE] Server: Add `-grpc.server.num-workers` flag that configures the `grpc.NumStreamWorkers()` option. This can be used to start a fixed base amount of workers to process gRPC requests and avoid stack allocation for each call. #400 -* [FEATURE] Add `PartitionRing`. The partitions ring is hash ring to shard data between partitions. #474 #476 #478 #479 #481 #483 #484 #485 #488 #489 #493 #496 +* [FEATURE] Add `PartitionRing`. The partitions ring is hash ring to shard data between partitions. #474 #476 #478 #479 #481 #483 #484 #485 #488 #489 #493 #496 #498 * [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477 * [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486 * [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495 diff --git a/ring/replication_set_tracker.go b/ring/replication_set_tracker.go index 06aa7776e..73da1bc37 100644 --- a/ring/replication_set_tracker.go +++ b/ring/replication_set_tracker.go @@ -469,7 +469,7 @@ func (t *zoneAwareContextTracker) cancelAllContexts(cause error) { type inflightInstanceTracker struct { mx sync.Mutex - inflight []map[*InstanceDesc]struct{} + inflight [][]*InstanceDesc // expectMoreInstances is true if more instances are expected to be added to the tracker. expectMoreInstances bool @@ -477,9 +477,9 @@ type inflightInstanceTracker struct { func newInflightInstanceTracker(sets []ReplicationSet) *inflightInstanceTracker { // Init the inflight tracker. - inflight := make([]map[*InstanceDesc]struct{}, len(sets)) + inflight := make([][]*InstanceDesc, len(sets)) for idx, set := range sets { - inflight[idx] = make(map[*InstanceDesc]struct{}, len(set.Instances)) + inflight[idx] = make([]*InstanceDesc, 0, len(set.Instances)) } return &inflightInstanceTracker{ @@ -495,7 +495,14 @@ func (t *inflightInstanceTracker) addInstance(replicationSetIdx int, instance *I t.mx.Lock() defer t.mx.Unlock() - t.inflight[replicationSetIdx][instance] = struct{}{} + // Check if the instance has already been added. + for _, curr := range t.inflight[replicationSetIdx] { + if curr == instance { + return + } + } + + t.inflight[replicationSetIdx] = append(t.inflight[replicationSetIdx], instance) } // removeInstance removes the instance for replicationSetIdx from the tracker. @@ -505,7 +512,15 @@ func (t *inflightInstanceTracker) removeInstance(replicationSetIdx int, instance t.mx.Lock() defer t.mx.Unlock() - delete(t.inflight[replicationSetIdx], instance) + for i, curr := range t.inflight[replicationSetIdx] { + if curr == instance { + instances := t.inflight[replicationSetIdx] + t.inflight[replicationSetIdx] = append(instances[:i], instances[i+1:]...) + + // We can safely break the loop because we don't expect multiple occurrences of the same instance. + return + } + } } // allInstancesAdded signals the tracker that all expected instances have been added.