Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix[MQB]: use CSL to update state on QueueAssignmentAdvisory #584

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,24 @@ void ClusterStateManager::onCommit(
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(advisory.choice().isClusterMessageValue());

// NOTE: Even when using old workflow, we still apply all advisories to the
// CSL. We just don't invoke the commit callbacks.
if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled()) {
return; // RETURN
}

if (status != mqbc::ClusterStateLedgerCommitStatus::e_SUCCESS) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": Failed to commit advisory: " << advisory
<< ", with status '" << status << "'";
return; // RETURN
}

const bmqp_ctrlmsg::ClusterMessage& clusterMessage =
advisory.choice().clusterMessage();

// NOTE: Even when using old workflow, we still apply all advisories to the
// CSL. We just don't invoke the commit callbacks.
// Make an exception for QueueAssignmentAdvisory
if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
!clusterMessage.choice().isQueueAssignmentAdvisoryValue()) {
return; // RETURN
}

// Commenting out following 'if' check to fix an assert during node
// shutdown.
// if ( d_clusterData_p->membership().selfNodeStatus()
Expand All @@ -94,8 +99,6 @@ void ClusterStateManager::onCommit(
<< ": Committed advisory: " << advisory << ", with status '"
<< status << "'";

const bmqp_ctrlmsg::ClusterMessage& clusterMessage =
advisory.choice().clusterMessage();
mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p);
}

Expand Down Expand Up @@ -1690,16 +1693,21 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
}
}
else {
AppInfos appIdInfos(d_allocator_p);
if (delayed) {
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);
mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);

d_state_p->assignQueue(uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
d_state_p->assignQueue(uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
}
// When this function is not buffered, called from
// processQueueAssignmentAdvisory, assignQueue will
// be triggered through mqbblp::ClusterStateManager::onCommit
}

BALL_LOG_INFO << d_cluster_p->description()
Expand Down Expand Up @@ -2150,7 +2158,9 @@ void ClusterStateManager::processLeaderAdvisory(
queueAsgnAdv.sequenceNumber() = advisory.sequenceNumber();
queueAsgnAdv.queues() = advisory.queues();

processQueueAssignmentAdvisory(controlMsg, source);
processQueueAssignmentAdvisory(controlMsg,
source,
false /* not delayed */);

// Leader status and sequence number are updated unconditionally. It may
// have been updated by one of the routines called earlier in this method,
Expand Down
28 changes: 0 additions & 28 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,34 +1009,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
<< queueAdvisory << ", rc: " << rc;
}

if (!cluster->isCSLModeEnabled()) {
// In CSL mode, we assign the queue to ClusterState upon CSL commit
// callback of QueueAssignmentAdvisory, so we don't assign it here.

// In non-CSL mode this is the shortcut to call Primary CQH instead of
// waiting for the quorum of acks in the ledger.

BSLS_ASSERT_SAFE(queueAdvisory.queues().size() == 1);

bmqp_ctrlmsg::QueueInfo& queueInfo = queueAdvisory.queues().back();

AppInfos appInfos(allocator);
mqbc::ClusterUtil::parseQueueInfo(&appInfos, queueInfo, allocator);

BSLA_MAYBE_UNUSED const bool assignRc = clusterState->assignQueue(
uri,
key,
queueAdvisory.queues().back().partitionId(),
appInfos);
BSLS_ASSERT_SAFE(assignRc);

BALL_LOG_INFO << cluster->description()
<< ": Queue assigned: " << queueAdvisory;

// Broadcast 'queueAssignmentAdvisory' to all followers
clusterData->messageTransmitter().broadcastMessage(controlMsg);
}

return QueueAssignmentResult::k_ASSIGNMENT_OK;
}

Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbi/mqbi_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class ClusterStateManager {
AfterPartitionPrimaryAssignmentCb;

/// Pair of (appId, appKey)
typedef bsl::pair<bsl::string, mqbu::StorageKey> AppInfo;
typedef bsl::pair<bsl::string, mqbu::StorageKey> AppInfo;
typedef bsl::unordered_map<bsl::string, mqbu::StorageKey> AppInfos;
typedef AppInfos::const_iterator AppInfosCIter;
typedef AppInfos::const_iterator AppInfosCIter;

struct QueueAssignmentResult {
enum Enum {
Expand Down
54 changes: 54 additions & 0 deletions src/integration-tests/test_migrate_to_csl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2025 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Testing migrating to CSL.
"""
import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import
Cluster,
multi_node,
)


def test_assign_queue(multi_node: Cluster):
"""
This test is to make sure when a queue assignment happens,
it goes thought the CSL path, not the non-CSL.
"""
proxies = multi_node.proxy_cycle()
proxy = next(proxies)

leader = multi_node.last_known_leader
members = multi_node.nodes(exclude=leader)

uri = tc.URI_PRIORITY_SC

producer = proxy.create_client("producer")
producer.open(uri, flags=["write"], succeed=True)

timeout = 1

for member in members:
assert member.outputs_regex(
"(Applying cluster message with type = UPDATE).*(queueAssignmentAdvisory)",
timeout,
)
assert member.outputs_regex(
"(Committed advisory).*queueAssignmentAdvisory", timeout
)
assert not member.outputs_regex(
"'QueueUnAssignmentAdvisory' will be applied to", timeout
)
Loading