From e3d18b999ba8dc807cdfbb3f3dc929ea2919684e Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Fri, 24 Jan 2025 16:56:58 -0500 Subject: [PATCH] Fix[MQB]: use CSL to update state on QueueAssignmentAdvisory Signed-off-by: Emelia Lei --- .../mqb/mqbblp/mqbblp_clusterstatemanager.cpp | 48 ++++++++++++------- .../mqb/mqbblp/mqbblp_clusterstatemanager.h | 9 ++-- .../mqb/mqbc/mqbc_clusterstatemanager.cpp | 3 +- .../mqb/mqbc/mqbc_clusterstatemanager.h | 9 ++-- src/groups/mqb/mqbc/mqbc_clusterutil.cpp | 28 ----------- .../mqb/mqbi/mqbi_clusterstatemanager.h | 3 +- 6 files changed, 44 insertions(+), 56 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 8bfd0380d2..9172f23617 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -70,12 +70,6 @@ void ClusterStateManager::onCommit( BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(advisory.choice().isClusterMessageValue()); - // NOTE: Even when using old workflow, we still apply all advisories to the - // CSL. We just don't invoke the commit callbacks. - if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled()) { - return; // RETURN - } - if (status != mqbc::ClusterStateLedgerCommitStatus::e_SUCCESS) { BALL_LOG_ERROR << d_clusterData_p->identity().description() << ": Failed to commit advisory: " << advisory @@ -83,6 +77,17 @@ void ClusterStateManager::onCommit( return; // RETURN } + const bmqp_ctrlmsg::ClusterMessage& clusterMessage = + advisory.choice().clusterMessage(); + + // NOTE: Even when using old workflow, we still apply all advisories to the + // CSL. We just don't invoke the commit callbacks. + // Make an exception for QueueAssignmentAdvisory + if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled() && + !clusterMessage.choice().isQueueAssignmentAdvisoryValue()) { + return; // RETURN + } + // Commenting out following 'if' check to fix an assert during node // shutdown. // if ( d_clusterData_p->membership().selfNodeStatus() @@ -94,8 +99,6 @@ void ClusterStateManager::onCommit( << ": Committed advisory: " << advisory << ", with status '" << status << "'"; - const bmqp_ctrlmsg::ClusterMessage& clusterMessage = - advisory.choice().clusterMessage(); mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p); } @@ -1481,7 +1484,8 @@ void ClusterStateManager::processQueueAssignmentRequest( void ClusterStateManager::processQueueAssignmentAdvisory( const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source, - bool delayed) + bool delayed, + bool fromLeaderAdvisory) { // executed by the cluster *DISPATCHER* thread @@ -1690,16 +1694,21 @@ void ClusterStateManager::processQueueAssignmentAdvisory( } } else { - AppInfos appIdInfos(d_allocator_p); + if (delayed || fromLeaderAdvisory) { + AppInfos appIdInfos(d_allocator_p); - mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, - queueInfo, - d_allocator_p); + mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, + queueInfo, + d_allocator_p); - d_state_p->assignQueue(uri, - queueKey, - queueInfo.partitionId(), - appIdInfos); + d_state_p->assignQueue(uri, + queueKey, + queueInfo.partitionId(), + appIdInfos); + } + // When this function is called from + // processQueueAssignmentAdvisory, assignQueue will + // be triggered through mqbblp::ClusterStateManager::onCommit } BALL_LOG_INFO << d_cluster_p->description() @@ -2150,7 +2159,10 @@ void ClusterStateManager::processLeaderAdvisory( queueAsgnAdv.sequenceNumber() = advisory.sequenceNumber(); queueAsgnAdv.queues() = advisory.queues(); - processQueueAssignmentAdvisory(controlMsg, source); + processQueueAssignmentAdvisory(controlMsg, + source, + false /* not delayed */, + true /* called from leaderAdvisory */); // Leader status and sequence number are updated unconditionally. It may // have been updated by one of the routines called earlier in this method, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index d73d7c4b39..d08b3de0c8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -489,10 +489,11 @@ class ClusterStateManager BSLS_KEYWORD_FINAL /// TODO_CSL: This is the current workflow which we should be able to /// remove after the new workflow via /// ClusterQueueHelper::onQueueAssigned() is stable. - void - processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false) BSLS_KEYWORD_OVERRIDE; + void processQueueAssignmentAdvisory( + const bmqp_ctrlmsg::ControlMessage& message, + mqbnet::ClusterNode* source, + bool delayed = false, + bool fromLeaderAdvisory = false) BSLS_KEYWORD_OVERRIDE; /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index 1ba151f73f..e6ee312362 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -1784,7 +1784,8 @@ void ClusterStateManager::processQueueAssignmentRequest( void ClusterStateManager::processQueueAssignmentAdvisory( BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ControlMessage& message, BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source, - BSLS_ANNOTATION_UNUSED bool delayed) + BSLS_ANNOTATION_UNUSED bool delayed, + BSLS_ANNOTATION_UNUSED bool fromLeaderAdvisory) { BSLS_ASSERT_SAFE(false && "This method should only be invoked in non-CSL mode"); diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index 57228c8404..e162ac0f12 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -589,10 +589,11 @@ class ClusterStateManager BSLS_KEYWORD_FINAL /// TODO_CSL: This is the current workflow which we should be able to /// remove after the new workflow via /// ClusterQueueHelper::onQueueAssigned() is stable. - void - processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false) BSLS_KEYWORD_OVERRIDE; + void processQueueAssignmentAdvisory( + const bmqp_ctrlmsg::ControlMessage& message, + mqbnet::ClusterNode* source, + bool delayed = false, + bool fromLeaderAdvisory = false) BSLS_KEYWORD_OVERRIDE; /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index c3988cd51f..fb7d83e900 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -1007,34 +1007,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState, << queueAdvisory << ", rc: " << rc; } - if (!cluster->isCSLModeEnabled()) { - // In CSL mode, we assign the queue to ClusterState upon CSL commit - // callback of QueueAssignmentAdvisory, so we don't assign it here. - - // In non-CSL mode this is the shortcut to call Primary CQH instead of - // waiting for the quorum of acks in the ledger. - - BSLS_ASSERT_SAFE(queueAdvisory.queues().size() == 1); - - bmqp_ctrlmsg::QueueInfo& queueInfo = queueAdvisory.queues().back(); - - AppInfos appInfos(allocator); - mqbc::ClusterUtil::parseQueueInfo(&appInfos, queueInfo, allocator); - - BSLA_MAYBE_UNUSED const bool assignRc = clusterState->assignQueue( - uri, - key, - queueAdvisory.queues().back().partitionId(), - appInfos); - BSLS_ASSERT_SAFE(assignRc); - - BALL_LOG_INFO << cluster->description() - << ": Queue assigned: " << queueAdvisory; - - // Broadcast 'queueAssignmentAdvisory' to all followers - clusterData->messageTransmitter().broadcastMessage(controlMsg); - } - return QueueAssignmentResult::k_ASSIGNMENT_OK; } diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index c226f937fc..6d3ce91efc 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -336,7 +336,8 @@ class ClusterStateManager { virtual void processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source, - bool delayed = false) = 0; + bool delayed = false, + bool fromLeaderAdvisory = false) = 0; /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`.