From 15f0fb47f6820365a5fe3959af8e12bdc30c01ad Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Fri, 24 Jan 2025 17:11:33 -0500 Subject: [PATCH] Allow QueueAssignmentAdvisory do CSL from the second part of LeaderAdvisory Signed-off-by: Emelia Lei --- src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp | 10 ++++------ src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h | 9 ++++----- src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp | 3 +-- src/groups/mqb/mqbc/mqbc_clusterstatemanager.h | 9 ++++----- src/groups/mqb/mqbi/mqbi_clusterstatemanager.h | 7 +++---- 5 files changed, 16 insertions(+), 22 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 9172f23617..228c4492d9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -1484,8 +1484,7 @@ void ClusterStateManager::processQueueAssignmentRequest( void ClusterStateManager::processQueueAssignmentAdvisory( const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source, - bool delayed, - bool fromLeaderAdvisory) + bool delayed) { // executed by the cluster *DISPATCHER* thread @@ -1694,7 +1693,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory( } } else { - if (delayed || fromLeaderAdvisory) { + if (delayed) { AppInfos appIdInfos(d_allocator_p); mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, @@ -1706,7 +1705,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory( queueInfo.partitionId(), appIdInfos); } - // When this function is called from + // When this function is not buffered, called from // processQueueAssignmentAdvisory, assignQueue will // be triggered through mqbblp::ClusterStateManager::onCommit } @@ -2161,8 +2160,7 @@ void ClusterStateManager::processLeaderAdvisory( processQueueAssignmentAdvisory(controlMsg, source, - false /* not delayed */, - true /* called from leaderAdvisory */); + false /* not delayed */); // Leader status and sequence number are updated unconditionally. It may // have been updated by one of the routines called earlier in this method, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index d08b3de0c8..d73d7c4b39 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -489,11 +489,10 @@ class ClusterStateManager BSLS_KEYWORD_FINAL /// TODO_CSL: This is the current workflow which we should be able to /// remove after the new workflow via /// ClusterQueueHelper::onQueueAssigned() is stable. - void processQueueAssignmentAdvisory( - const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false, - bool fromLeaderAdvisory = false) BSLS_KEYWORD_OVERRIDE; + void + processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, + mqbnet::ClusterNode* source, + bool delayed = false) BSLS_KEYWORD_OVERRIDE; /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index e6ee312362..1ba151f73f 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -1784,8 +1784,7 @@ void ClusterStateManager::processQueueAssignmentRequest( void ClusterStateManager::processQueueAssignmentAdvisory( BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ControlMessage& message, BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source, - BSLS_ANNOTATION_UNUSED bool delayed, - BSLS_ANNOTATION_UNUSED bool fromLeaderAdvisory) + BSLS_ANNOTATION_UNUSED bool delayed) { BSLS_ASSERT_SAFE(false && "This method should only be invoked in non-CSL mode"); diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index e162ac0f12..57228c8404 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -589,11 +589,10 @@ class ClusterStateManager BSLS_KEYWORD_FINAL /// TODO_CSL: This is the current workflow which we should be able to /// remove after the new workflow via /// ClusterQueueHelper::onQueueAssigned() is stable. - void processQueueAssignmentAdvisory( - const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false, - bool fromLeaderAdvisory = false) BSLS_KEYWORD_OVERRIDE; + void + processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, + mqbnet::ClusterNode* source, + bool delayed = false) BSLS_KEYWORD_OVERRIDE; /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index 6d3ce91efc..6e45208ddd 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -86,9 +86,9 @@ class ClusterStateManager { AfterPartitionPrimaryAssignmentCb; /// Pair of (appId, appKey) - typedef bsl::pair AppInfo; + typedef bsl::pair AppInfo; typedef bsl::unordered_map AppInfos; - typedef AppInfos::const_iterator AppInfosCIter; + typedef AppInfos::const_iterator AppInfosCIter; struct QueueAssignmentResult { enum Enum { @@ -336,8 +336,7 @@ class ClusterStateManager { virtual void processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source, - bool delayed = false, - bool fromLeaderAdvisory = false) = 0; + bool delayed = false) = 0; /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`.