Skip to content

Commit

Permalink
Fix[MQB]: use CSL to update state on QueueAssignmentAdvisory
Browse files Browse the repository at this point in the history
Signed-off-by: Emelia Lei <[email protected]>
  • Loading branch information
emelialei88 committed Jan 24, 2025
1 parent c18ff5c commit e3d18b9
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 56 deletions.
48 changes: 30 additions & 18 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,24 @@ void ClusterStateManager::onCommit(
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(advisory.choice().isClusterMessageValue());

// NOTE: Even when using old workflow, we still apply all advisories to the
// CSL. We just don't invoke the commit callbacks.
if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled()) {
return; // RETURN
}

if (status != mqbc::ClusterStateLedgerCommitStatus::e_SUCCESS) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": Failed to commit advisory: " << advisory
<< ", with status '" << status << "'";
return; // RETURN
}

const bmqp_ctrlmsg::ClusterMessage& clusterMessage =
advisory.choice().clusterMessage();

// NOTE: Even when using old workflow, we still apply all advisories to the
// CSL. We just don't invoke the commit callbacks.
// Make an exception for QueueAssignmentAdvisory
if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
!clusterMessage.choice().isQueueAssignmentAdvisoryValue()) {
return; // RETURN
}

// Commenting out following 'if' check to fix an assert during node
// shutdown.
// if ( d_clusterData_p->membership().selfNodeStatus()
Expand All @@ -94,8 +99,6 @@ void ClusterStateManager::onCommit(
<< ": Committed advisory: " << advisory << ", with status '"
<< status << "'";

const bmqp_ctrlmsg::ClusterMessage& clusterMessage =
advisory.choice().clusterMessage();
mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p);
}

Expand Down Expand Up @@ -1481,7 +1484,8 @@ void ClusterStateManager::processQueueAssignmentRequest(
void ClusterStateManager::processQueueAssignmentAdvisory(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source,
bool delayed)
bool delayed,
bool fromLeaderAdvisory)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -1690,16 +1694,21 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
}
}
else {
AppInfos appIdInfos(d_allocator_p);
if (delayed || fromLeaderAdvisory) {
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);
mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);

d_state_p->assignQueue(uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
d_state_p->assignQueue(uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
}
// When this function is called from
// processQueueAssignmentAdvisory, assignQueue will
// be triggered through mqbblp::ClusterStateManager::onCommit
}

BALL_LOG_INFO << d_cluster_p->description()
Expand Down Expand Up @@ -2150,7 +2159,10 @@ void ClusterStateManager::processLeaderAdvisory(
queueAsgnAdv.sequenceNumber() = advisory.sequenceNumber();
queueAsgnAdv.queues() = advisory.queues();

processQueueAssignmentAdvisory(controlMsg, source);
processQueueAssignmentAdvisory(controlMsg,
source,
false /* not delayed */,
true /* called from leaderAdvisory */);

// Leader status and sequence number are updated unconditionally. It may
// have been updated by one of the routines called earlier in this method,
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,11 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
/// TODO_CSL: This is the current workflow which we should be able to
/// remove after the new workflow via
/// ClusterQueueHelper::onQueueAssigned() is stable.
void
processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source,
bool delayed = false) BSLS_KEYWORD_OVERRIDE;
void processQueueAssignmentAdvisory(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source,
bool delayed = false,
bool fromLeaderAdvisory = false) BSLS_KEYWORD_OVERRIDE;

/// Process the queue unAssigned advisory in the specified `message`
/// received from the specified `source`.
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,8 @@ void ClusterStateManager::processQueueAssignmentRequest(
void ClusterStateManager::processQueueAssignmentAdvisory(
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ControlMessage& message,
BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source,
BSLS_ANNOTATION_UNUSED bool delayed)
BSLS_ANNOTATION_UNUSED bool delayed,
BSLS_ANNOTATION_UNUSED bool fromLeaderAdvisory)
{
BSLS_ASSERT_SAFE(false &&
"This method should only be invoked in non-CSL mode");
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,11 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
/// TODO_CSL: This is the current workflow which we should be able to
/// remove after the new workflow via
/// ClusterQueueHelper::onQueueAssigned() is stable.
void
processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source,
bool delayed = false) BSLS_KEYWORD_OVERRIDE;
void processQueueAssignmentAdvisory(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source,
bool delayed = false,
bool fromLeaderAdvisory = false) BSLS_KEYWORD_OVERRIDE;

/// Process the queue unAssigned advisory in the specified `message`
/// received from the specified `source`.
Expand Down
28 changes: 0 additions & 28 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1007,34 +1007,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
<< queueAdvisory << ", rc: " << rc;
}

if (!cluster->isCSLModeEnabled()) {
// In CSL mode, we assign the queue to ClusterState upon CSL commit
// callback of QueueAssignmentAdvisory, so we don't assign it here.

// In non-CSL mode this is the shortcut to call Primary CQH instead of
// waiting for the quorum of acks in the ledger.

BSLS_ASSERT_SAFE(queueAdvisory.queues().size() == 1);

bmqp_ctrlmsg::QueueInfo& queueInfo = queueAdvisory.queues().back();

AppInfos appInfos(allocator);
mqbc::ClusterUtil::parseQueueInfo(&appInfos, queueInfo, allocator);

BSLA_MAYBE_UNUSED const bool assignRc = clusterState->assignQueue(
uri,
key,
queueAdvisory.queues().back().partitionId(),
appInfos);
BSLS_ASSERT_SAFE(assignRc);

BALL_LOG_INFO << cluster->description()
<< ": Queue assigned: " << queueAdvisory;

// Broadcast 'queueAssignmentAdvisory' to all followers
clusterData->messageTransmitter().broadcastMessage(controlMsg);
}

return QueueAssignmentResult::k_ASSIGNMENT_OK;
}

Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbi/mqbi_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ class ClusterStateManager {
virtual void
processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source,
bool delayed = false) = 0;
bool delayed = false,
bool fromLeaderAdvisory = false) = 0;

/// Process the queue unAssigned advisory in the specified `message`
/// received from the specified `source`.
Expand Down

0 comments on commit e3d18b9

Please sign in to comment.