From b5f5552900ae821aaae9195178e355f03c49a198 Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Thu, 6 Feb 2025 12:15:34 -0500 Subject: [PATCH] remove processQueueAssignmentAdvisory Signed-off-by: Emelia Lei --- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 19 +- .../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 12 - .../mqb/mqbblp/mqbblp_clusterorchestrator.h | 9 - .../mqb/mqbblp/mqbblp_clusterstatemanager.cpp | 291 ++---------------- .../mqb/mqbblp/mqbblp_clusterstatemanager.h | 15 - .../mqb/mqbc/mqbc_clusterstatemanager.cpp | 9 - .../mqb/mqbc/mqbc_clusterstatemanager.h | 15 - .../mqb/mqbi/mqbi_clusterstatemanager.h | 15 - src/integration-tests/test_migrate_to_csl.py | 209 ++++++++++++- 9 files changed, 241 insertions(+), 353 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index d8915ca4ae..92b2897fa3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -441,10 +441,10 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, d_throttledFailedAckMessages, BALL_LOG_INFO << description() << ": failed Ack " << "[status: " << status << ", source: '" << source - << "'" << ", correlationId: " << correlationId + << "', correlationId: " << correlationId << ", GUID: " << messageGUID << ", queue: '" - << (found ? uri : "** null **") << "' " - << "(id: " << queueId << ")] " << "to node " + << (found ? uri : "** null **") + << "' (id: " << queueId << ")] to node " << nodeSession->clusterNode()->nodeDescription();); } @@ -453,7 +453,7 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, << "[status: " << status << ", source: '" << source << "'" << ", correlationId: " << correlationId << ", GUID: " << messageGUID << ", queue: '" << uri - << "' (id: " << queueId << ")] to " << "node " + << "' (id: " << queueId << ")] to node " << nodeSession->clusterNode()->nodeDescription(); // Update stats for the queue (or subStream of the queue) @@ -492,7 +492,7 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, d_throttledDroppedAckMessages, BALL_LOG_ERROR << description() << ": dropping ACK message " << "[status: " << status << ", source: '" << source - << "'" << ", correlationId: " << correlationId + << "', correlationId: " << correlationId << ", GUID: " << messageGUID << ", queueId: " << queueId << "] to node " << nodeSession->clusterNode()->nodeDescription() @@ -3076,15 +3076,6 @@ void Cluster::processClusterControlMessage( source), this); } break; // BREAK - case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY: { - dispatcher()->execute( - bdlf::BindUtil::bind( - &ClusterOrchestrator::processQueueAssignmentAdvisory, - &d_clusterOrchestrator, - message, - source), - this); - } break; // BREAK case MsgChoice::SELECTION_ID_NODE_STATUS_ADVISORY: { dispatcher()->execute( bdlf::BindUtil::bind( diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index 163c93bb90..9da1bcc461 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -1338,18 +1338,6 @@ void ClusterOrchestrator::processQueueAssignmentRequest( d_stateManager_mp->processQueueAssignmentRequest(request, requester); } -void ClusterOrchestrator::processQueueAssignmentAdvisory( - const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source) -{ - // executed by the cluster *DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p)); - - d_stateManager_mp->processQueueAssignmentAdvisory(message, source); -} - void ClusterOrchestrator::processQueueUnassignedAdvisory( const bmqp_ctrlmsg::ControlMessage& msg, mqbnet::ClusterNode* source) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h index face75fc7f..e781a79e13 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h @@ -353,15 +353,6 @@ class ClusterOrchestrator { processQueueAssignmentRequest(const bmqp_ctrlmsg::ControlMessage& request, mqbnet::ClusterNode* requester); - /// Process the specified queue assignment advisory `message` from the - /// specified `source`. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - void - processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source); - /// Process the queue unAssigned advisory in the specified `msg` /// received from the specified `source`. /// diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index ff5b8e7ff3..6705907e81 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -85,7 +85,33 @@ void ClusterStateManager::onCommit( // Make an exception for QueueUpdateAdvisory and QueueAssignmentAdvisory if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled() && !clusterMessage.choice().isQueueUpdateAdvisoryValue() && - !clusterMessage.choice().isQueueAssignmentAdvisoryValue()) { + !clusterMessage.choice().isQueueAssignmentAdvisoryValue() && + !clusterMessage.choice().isLeaderAdvisoryValue()) { + return; // RETURN + } + + // For LeaderAdvisory, we only apply QueueAssignmentAdvisory part + // to CSL for now + if (clusterMessage.choice().isLeaderAdvisoryValue()) { + bmqp_ctrlmsg::ControlMessage controlMsg; + bmqp_ctrlmsg::ClusterMessage& clusterMsg = + controlMsg.choice().makeClusterMessage(); + bmqp_ctrlmsg::QueueAssignmentAdvisory& queueAsgnAdv = + clusterMsg.choice().makeQueueAssignmentAdvisory(); + + const bmqp_ctrlmsg::LeaderAdvisory& leaderAdvisory = + clusterMessage.choice().leaderAdvisory(); + + queueAsgnAdv.sequenceNumber() = leaderAdvisory.sequenceNumber(); + queueAsgnAdv.queues() = leaderAdvisory.queues(); + + BALL_LOG_INFO + << d_clusterData_p->identity().description() + << ": Committed advisory (Queue Assignment for Leader Advisory): " + << advisory << ", with status '" << status << "'"; + + mqbc::ClusterUtil::apply(d_state_p, clusterMsg, *d_clusterData_p); + return; // RETURN } @@ -676,23 +702,11 @@ void ClusterStateManager::processBufferedQueueAdvisories() BSLS_ASSERT_SAFE(msg.choice().isClusterMessageValue()); BSLS_ASSERT_SAFE(msg.choice() - .clusterMessage() - .choice() - .isQueueAssignmentAdvisoryValue() || - msg.choice() .clusterMessage() .choice() .isQueueUnAssignmentAdvisoryValue()); - if (msg.choice() - .clusterMessage() - .choice() - .isQueueAssignmentAdvisoryValue()) { - processQueueAssignmentAdvisory(msg, source, true /* delayed */); - } - else { - processQueueUnAssignmentAdvisory(msg, source, true /* delayed */); - } + processQueueUnAssignmentAdvisory(msg, source, true /* delayed */); } d_bufferedQueueAdvisories.clear(); @@ -1482,240 +1496,6 @@ void ClusterStateManager::processQueueAssignmentRequest( d_allocator_p); } -void ClusterStateManager::processQueueAssignmentAdvisory( - const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed) -{ - // executed by the cluster *DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(message.choice().isClusterMessageValue()); - BSLS_ASSERT_SAFE(message.choice() - .clusterMessage() - .choice() - .isQueueAssignmentAdvisoryValue()); - - BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); - - // Two cases: - // 1. A leader can send a composite 'LeaderAdvisory' message which - // contains both partitionPrimaryAdvisory and queueAssignmentAdvisory. - // This composite message is first processed in - // 'processLaderAdvisory', which then forwards QueueAssignmentAdvisory - // part to ClusterQueueHelper after all leader-related validations. - // 2. A leader can also send just the 'QueueAssignmentAdvisory' message, - // which is directly processed here. In this case, we need to perform - // leader-related validations ourselves. - // We unconditionally perform validations in both cases. There is no - // side-effect of doing this. - - const bmqp_ctrlmsg::QueueAssignmentAdvisory& queueAdvisory = - message.choice().clusterMessage().choice().queueAssignmentAdvisory(); - const bmqp_ctrlmsg::LeaderMessageSequence& leaderMsgSeq = - queueAdvisory.sequenceNumber(); - - if (d_clusterConfig.clusterAttributes().isCSLModeEnabled()) { - BALL_LOG_ERROR << "#CSL_MODE_MIX " - << "Received legacy " << (delayed ? "buffered " : "") - << "queueAssignmentAdvisory: " << queueAdvisory - << " from: " << source << " in CSL mode."; - - return; // RETURN - } - - BALL_LOG_INFO << d_cluster_p->description() << ": Processing" - << (delayed ? " buffered " : " ") - << "queueAssignmentAdvisory message: " << message - << ", from '" << source->nodeDescription() << "'"; - - if (!delayed) { - // Source (leader) and leader sequence number should not be validated - // for delayed (aka buffered) advisories. Those attributes were - // validated when buffered advisories were received. - - if (d_clusterData_p->electorInfo().leaderNode() != source) { - // Different leader. Ignore message. - BALL_LOG_WARN << d_cluster_p->description() - << ": ignoring queueAssignmentAdvisory: " - << queueAdvisory - << ", from: " << source->nodeDescription() - << ", but current leader is: " - << (d_clusterData_p->electorInfo().leaderNode() - ? d_clusterData_p->electorInfo() - .leaderNode() - ->nodeDescription() - : "** none **") - << ", with term: " - << d_clusterData_p->electorInfo().electorTerm(); - return; // RETURN - } - - if (!(d_clusterData_p->electorInfo().leaderMessageSequence() < - leaderMsgSeq)) { - BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << d_cluster_p->description() - << ": got queueAssignmentAdvisory: " << queueAdvisory - << " from current leader: " << source->nodeDescription() - << ", with smaller/equal leader message sequence: " - << leaderMsgSeq << ". Current value: " - << d_clusterData_p->electorInfo().leaderMessageSequence() - << ". Ignoring this advisory." << BMQTSK_ALARMLOG_END; - return; // RETURN - } - - // Leader status and sequence number are updated unconditionally. It - // may have been updated by one of the callers of this routine, but - // there is no harm is setting these values again. - d_clusterData_p->electorInfo().setLeaderMessageSequence(leaderMsgSeq); - d_clusterData_p->electorInfo().setLeaderStatus( - mqbc::ElectorInfoLeaderStatus::e_ACTIVE); - } - - if (d_clusterData_p->membership().selfNodeStatus() == - bmqp_ctrlmsg::NodeStatus::E_STOPPING) { - // No need to process the advisory since self is stopping. - BALL_LOG_INFO << d_cluster_p->description() - << ": Not processing queue asssignment advisory since " - << "self is stopping."; - return; // RETURN - } - - // Advisory and source have been validated. If self is starting and this - // is a "live" advisory, buffer the advisory and it will be applied later, - // else apply it right away. - if (!delayed && (d_clusterData_p->membership().selfNodeStatus() == - bmqp_ctrlmsg::NodeStatus::E_STARTING)) { - d_bufferedQueueAdvisories.push_back(bsl::make_pair(message, source)); - return; // RETURN - } - - for (bsl::vector::const_iterator it = - queueAdvisory.queues().begin(); - it != queueAdvisory.queues().end(); - ++it) { - const bmqp_ctrlmsg::QueueInfo& queueInfo = *it; - bmqt::Uri uri(queueInfo.uri()); - const mqbu::StorageKey queueKey( - mqbu::StorageKey::BinaryRepresentation(), - queueInfo.key().data()); - - mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri); - // Only Replica can `processQueueAssignmentAdvisory`. Therefore, the - // state cannot be `k_UNASSIGNING` - - if (assigned) { - // Queue is assigned. Verify that the key and partition match - // with what we already have. - - if (assigned->partitionId() != queueInfo.partitionId() || - (assigned->key() != queueKey)) { - if (!delayed) { - // Leader is telling self node to map a queue to new - // partition or have a new key (basically, its a new - // incarnation of the queue). This could occur when a - // queue is being opened-closed-opened in very quick - // succession. Old instance of the queue is deleted by - // the primary, primary broadcasts queue-unasssignment - // advisory, leader broadcasts queue-assignment - // advisory for the new instance of the queue, but self - // node receives those 2 broadcasts out of order - // (leader's advisory followed by primary's advisory). - // In this case, its beneficial to force-update self's - // view of the queue with what the leader is - // advertising (with an error). When self receives - // queue-unassignment advisory from the primary for the - // old instance of the queue, it will log an error and - // ignore it. - - BALL_LOG_ERROR - << d_cluster_p->description() << ": " - << "received queueAssignmentAdvisory from leader '" - << source->nodeDescription() << "' for a known and" - << " assigned queue with different " - << "partitionId/key: [received: " << queueInfo - << ", knownPartitionId: " << assigned->partitionId() - << ", knownQueueKey: " << assigned->key() << "]"; - } - else { - // There is partitionId/queueKey mismatch and this is a - // delayed (aka, buffered) advisory. This is a valid - // scenario. Here's how: Node starts up, initiates - // storage sync with the primary While recovery is - // underway, a queue, which is active, is deleted and - // unassigned by the primary. Further, same queue is - // opened again, which means leader may assign it to a - // different partition, and will definitely assign it a - // different queue key, and will issue a queue - // assignment advisory. But self will buffer it. When - // recovery is complete, self's storage manager will - // apply all recovered queues (including the previous - // incarnation of this queue) to self's cluster state - // (via 'ClusterStateManager::registerQueueInfo'), and - // thus, populate 'd_queues', and this is how we will - // end up here. So instead of alarming/asserting, we - // simply log at warn, and overwrite current state with - // the buffered (this) advisory and move on. - - BALL_LOG_WARN - << d_cluster_p->description() - << ": overwriting current known queue state " - << "with the buffered advisory for queue [" - << assigned->uri() << "]. Current assigned Partition [" - << assigned->partitionId() << "], current queueKey [" - << assigned->key() << "], new Partition [" - << queueInfo.partitionId() << "], new queueKey [" - << queueKey << "]."; - } - - // Remove existing state, mapping, etc. - - d_state_p->queueKeys().erase(assigned->key()); - // no need to update d_state_p->domainStates() entry - // , queue was already known and registered - AppInfos appIdInfos(d_allocator_p); - - mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, - queueInfo, - d_allocator_p); - - BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue( - uri, - queueKey, - queueInfo.partitionId(), - appIdInfos); - BSLS_ASSERT_SAFE(rc == false); - } - else { - // Queue is assigned, and there is no partitionId/queueKey - // mismatch. So this assert should not fire. - BSLS_ASSERT_SAFE(1 == d_state_p->queueKeys().count(queueKey)); - } - } - else { - if (delayed) { - AppInfos appIdInfos(d_allocator_p); - - mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, - queueInfo, - d_allocator_p); - - d_state_p->assignQueue(uri, - queueKey, - queueInfo.partitionId(), - appIdInfos); - } - // When this function is not buffered, called from - // processQueueAssignmentAdvisory, assignQueue will - // be triggered through mqbblp::ClusterStateManager::onCommit - } - - BALL_LOG_INFO << d_cluster_p->description() - << ": Queue assigned: " << queueInfo; - } -} - void ClusterStateManager::processQueueUnassignedAdvisory( const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source) @@ -2149,19 +1929,8 @@ void ClusterStateManager::processLeaderAdvisory( processPartitionPrimaryAdvisoryRaw(advisory.partitions(), source); - // Process (QueueUri, QueueKey, PartitionId) mapping. - bmqp_ctrlmsg::ControlMessage controlMsg; - bmqp_ctrlmsg::ClusterMessage& clusterMsg = - controlMsg.choice().makeClusterMessage(); - bmqp_ctrlmsg::QueueAssignmentAdvisory& queueAsgnAdv = - clusterMsg.choice().makeQueueAssignmentAdvisory(); - - queueAsgnAdv.sequenceNumber() = advisory.sequenceNumber(); - queueAsgnAdv.queues() = advisory.queues(); - - processQueueAssignmentAdvisory(controlMsg, - source, - false /* not delayed */); + // Process (QueueUri, QueueKey, PartitionId) mapping (removed) + // this has been done through CSL // Leader status and sequence number are updated unconditionally. It may // have been updated by one of the routines called earlier in this method, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index d73d7c4b39..e191f6368e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -479,21 +479,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL const bmqp_ctrlmsg::ControlMessage& request, mqbnet::ClusterNode* requester) BSLS_KEYWORD_OVERRIDE; - /// Process the specified queue assignment advisory `message` from the - /// specified `source`. If the specified `delayed` is true, the - /// advisory has previously been delayed for processing. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - /// - /// TODO_CSL: This is the current workflow which we should be able to - /// remove after the new workflow via - /// ClusterQueueHelper::onQueueAssigned() is stable. - void - processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false) BSLS_KEYWORD_OVERRIDE; - /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. /// diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index 1ba151f73f..8715d434a2 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -1781,15 +1781,6 @@ void ClusterStateManager::processQueueAssignmentRequest( d_allocator_p); } -void ClusterStateManager::processQueueAssignmentAdvisory( - BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ControlMessage& message, - BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source, - BSLS_ANNOTATION_UNUSED bool delayed) -{ - BSLS_ASSERT_SAFE(false && - "This method should only be invoked in non-CSL mode"); -} - void ClusterStateManager::processQueueUnassignedAdvisory( BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ControlMessage& message, BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source) diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index 57228c8404..ea38eac595 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -579,21 +579,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL const bmqp_ctrlmsg::ControlMessage& request, mqbnet::ClusterNode* requester) BSLS_KEYWORD_OVERRIDE; - /// Process the specified queue assignment advisory `message` from the - /// specified `source`. If the specified `delayed` is true, the - /// advisory has previously been delayed for processing. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - /// - /// TODO_CSL: This is the current workflow which we should be able to - /// remove after the new workflow via - /// ClusterQueueHelper::onQueueAssigned() is stable. - void - processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false) BSLS_KEYWORD_OVERRIDE; - /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. /// diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index 6e45208ddd..573a9354ef 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -323,21 +323,6 @@ class ClusterStateManager { processQueueAssignmentRequest(const bmqp_ctrlmsg::ControlMessage& request, mqbnet::ClusterNode* requester) = 0; - /// Process the specified queue assignment advisory `message` from the - /// specified `source`. If the specified `delayed` is true, the - /// advisory has previously been delayed for processing. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - /// - /// TODO_CSL: This is the current workflow which we should be able to - /// remove after the new workflow via - /// ClusterQueueHelper::onQueueAssigned() is stable. - virtual void - processQueueAssignmentAdvisory(const bmqp_ctrlmsg::ControlMessage& message, - mqbnet::ClusterNode* source, - bool delayed = false) = 0; - /// Process the queue unAssigned advisory in the specified `message` /// received from the specified `source`. /// diff --git a/src/integration-tests/test_migrate_to_csl.py b/src/integration-tests/test_migrate_to_csl.py index fb5354a405..c3ac52305a 100644 --- a/src/integration-tests/test_migrate_to_csl.py +++ b/src/integration-tests/test_migrate_to_csl.py @@ -16,11 +16,46 @@ """ Testing migrating to CSL. """ + +import re +import time + import blazingmq.dev.it.testconstants as tc from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import + order, + tweak, Cluster, + cluster, multi_node, ) +from blazingmq.dev.it.util import wait_until + +pytestmark = order(2) + + +def ensureMessageAtStorageLayer( + cluster: Cluster, uri=tc.URI_PRIORITY, partition=0, num_msgs=1 +): + time.sleep(2) + # Before restarting the cluster, ensure that all nodes in the cluster + # have received the message at the storage layer. This is necessary + # in the absence of stronger consistency in storage replication in + # BMQ. Presence of message in the storage at each node is checked by + # sending 'STORAGE SUMMARY' command and grepping its output. + for node in cluster.nodes(): + node.command(f"CLUSTERS CLUSTER {node.cluster_name} STORAGE SUMMARY") + + time.sleep(2) + for node in cluster.nodes(): + assert node.outputs_regex( + rf"\w{{10}}\s+{partition}\s+{num_msgs}\s+\d+\s+B\s+" + re.escape(uri), + timeout=20, + ) + # Above regex is to match line: + # C1E2A44527 0 1 68 B bmq://bmq.test.mmap.priority.~tst/qqq + # where columns are: QueueKey, PartitionId, NumMsgs, NumBytes, + # QueueUri respectively. Since we opened only 1 queue, we know that + # it will be assigned to partitionId 0. def test_assign_queue(multi_node: Cluster): @@ -35,12 +70,11 @@ def test_assign_queue(multi_node: Cluster): members = multi_node.nodes(exclude=leader) uri = tc.URI_PRIORITY_SC + timeout = 1 producer = proxy.create_client("producer") producer.open(uri, flags=["write"], succeed=True) - timeout = 1 - for member in members: assert member.outputs_regex( "(Applying cluster message with type = UPDATE).*(queueAssignmentAdvisory)", @@ -50,5 +84,174 @@ def test_assign_queue(multi_node: Cluster): "(Committed advisory).*queueAssignmentAdvisory", timeout ) assert not member.outputs_regex( - "'QueueUnAssignmentAdvisory' will be applied to", timeout + "Processing queueAssignmentAdvisory message", timeout ) + + +@tweak.cluster.cluster_attributes.is_cslmode_enabled(False) +@tweak.cluster.cluster_attributes.is_fsmworkflow(False) +def test_reconfigure_from_non_CSL_to_CSL(cluster: Cluster): + """ + This test does the following steps to validate our conversion from non-CSL to CSL: + 1. Run broker in non-CSL and open queues + 2. Reconfigure to CSL and make sure queues are operational + 3. Open queues while in CSL mode + 4. Reconfigure back to non-CSL and make sure queues are operational + """ + uri_priority1 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/q1" + uri_fanout1 = f"bmq://{tc.DOMAIN_FANOUT_SC}/f1" + uri_fanout1_foo = f"bmq://{tc.DOMAIN_FANOUT_SC}/f1?id=foo" + uri_fanout1_bar = f"bmq://{tc.DOMAIN_FANOUT_SC}/f1?id=bar" + uri_priority2 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/q2" + uri_fanout2 = f"bmq://{tc.DOMAIN_FANOUT_SC}/f2" + uri_fanout2_foo = f"bmq://{tc.DOMAIN_FANOUT_SC}/f2?id=foo" + uri_fanout2_bar = f"bmq://{tc.DOMAIN_FANOUT_SC}/f2?id=bar" + + # ----------------------------------------------- + # 1. Start a producer in non-CSL. Then, post a message on a priority queue and a fanout queue. + proxies = cluster.proxy_cycle() + producer = next(proxies).create_client("producer") + producer.open(uri_priority1, flags=["write", "ack"], succeed=True) + producer.post(uri_priority1, payload=["msg1"], wait_ack=True, succeed=True) + producer.open(uri_fanout1, flags=["write", "ack"], succeed=True) + producer.post(uri_fanout1, payload=["fanout_msg1"], wait_ack=True, succeed=True) + + ensureMessageAtStorageLayer(cluster, uri_priority1, partition=0, num_msgs=1) + + # Consumer for fanout queue + consumer_foo = next(proxies).create_client("consumer_foo") + consumer_foo.open(uri_fanout1_foo, flags=["read"], succeed=True) + consumer_foo.wait_push_event() + + cluster._logger.info("send list command...") + + assert wait_until( + lambda: len(consumer_foo.list(uri_fanout1_foo, block=True)) == 1, 2 + ) + + # Save one confirm to the storage + consumer_foo.confirm(uri_fanout1_foo, "+1", succeed=True) + consumer_foo.close(uri_fanout1_foo, succeed=True) + + cluster.stop_nodes() + + cluster._logger.info("============ Before 2 ==========") + + # ----------------------------------------------- + # 2. Reconfigure the cluster from non-CSL to CSL mode + for broker in cluster.configurator.brokers.values(): + my_clusters = broker.clusters.my_clusters + if len(my_clusters) > 0: + my_clusters[0].cluster_attributes.is_cslmode_enabled = True + my_clusters[0].cluster_attributes.is_fsmworkflow = True + cluster.deploy_domains() + + cluster.start_nodes(wait_leader=True, wait_ready=True) + # For a standard cluster, states have already been restored as part of + # leader re-election. + if cluster.is_single_node: + producer.wait_state_restored() + + producer.post(uri_priority1, payload=["msg2"], wait_ack=True, succeed=True) + producer.post(uri_fanout1, payload=["fanout_msg2"], wait_ack=True, succeed=True) + + # Consumer for priority queue + consumer = next(proxies).create_client("consumer") + consumer.open(uri_priority1, flags=["read"], succeed=True) + consumer.wait_push_event() + assert wait_until(lambda: len(consumer.list(uri_priority1, block=True)) == 2, 2) + + # Consumers for fanout queue + consumer_bar = next(proxies).create_client("consumer_bar") + consumer_bar.open(uri_fanout1_bar, flags=["read"], succeed=True) + consumer_bar.wait_push_event() + assert wait_until( + lambda: len(consumer_bar.list(uri_fanout1_bar, block=True)) == 2, 2 + ) + + # make sure the previously saved confirm is not lost + consumer_foo.open(uri_fanout1_foo, flags=["read"], succeed=True) + consumer_foo.wait_push_event() + assert wait_until( + lambda: len(consumer_foo.list(uri_fanout1_foo, block=True)) == 1, 2 + ) + + # confirm all the messages to make sure we can shutdown gracefully + consumer.confirm(uri_priority1, "+2", succeed=True) + consumer_foo.confirm(uri_fanout1_foo, "+1", succeed=True) + consumer_bar.confirm(uri_fanout1_bar, "+2", succeed=True) + + cluster._logger.info("============ Before 3 ==========") + + # ----------------------------------------------- + # 3. Open queues in CSL mode + producer = next(proxies).create_client("producer") + producer.open(uri_priority2, flags=["write", "ack"], succeed=True) + producer.post(uri_priority2, payload=["msg1"], wait_ack=True, succeed=True) + producer.open(uri_fanout2, flags=["write", "ack"], succeed=True) + producer.post(uri_fanout2, payload=["fanout_msg1"], wait_ack=True, succeed=True) + + ensureMessageAtStorageLayer(cluster, uri_fanout1, partition=1, num_msgs=2) + ensureMessageAtStorageLayer(cluster, uri_priority2, partition=2, num_msgs=1) + + # Consumer for fanout queue + consumer_foo.open(uri_fanout2_foo, flags=["read"], succeed=True) + consumer_foo.wait_push_event() + assert wait_until( + lambda: len(consumer_foo.list(uri_fanout2_foo, block=True)) == 1, 2 + ) + + # Save one confirm to the storage + consumer_foo.confirm(uri_fanout2_foo, "+1", succeed=True) + + consumer_foo.close(uri_fanout2_foo, succeed=True) + + cluster._logger.info("before stop nodes...") + cluster.stop_nodes() + cluster._logger.info("after stop nodes...") + + cluster._logger.info("============ Before 4 ==========") + + # ----------------------------------------------- + # 4. Reconfigure the cluster back from CSL to non-CSL mode + for broker in cluster.configurator.brokers.values(): + my_clusters = broker.clusters.my_clusters + if len(my_clusters) > 0: + my_clusters[0].cluster_attributes.is_cslmode_enabled = False + my_clusters[0].cluster_attributes.is_fsmworkflow = False + cluster.deploy_domains() + + cluster._logger.info("before starting nodes...") + cluster.start_nodes(wait_leader=True, wait_ready=True) + cluster._logger.info("after starting nodes...") + # For a standard cluster, states have already been restored as part of + # leader re-election. + if cluster.is_single_node: + producer.wait_state_restored() + + producer.post(uri_priority2, payload=["msg2"], wait_ack=True, succeed=True) + producer.post(uri_fanout2, payload=["fanout_msg2"], wait_ack=True, succeed=True) + + # Consumer for priority queue + consumer = next(proxies).create_client("consumer") + consumer.open(uri_priority2, flags=["read"], succeed=True) + consumer.wait_push_event() + assert wait_until(lambda: len(consumer.list(uri_priority2, block=True)) == 2, 2) + + # Consumers for fanout queue + consumer_bar = next(proxies).create_client("consumer_bar") + consumer_bar.open(uri_fanout2_bar, flags=["read"], succeed=True) + consumer_bar.wait_push_event() + assert wait_until( + lambda: len(consumer_bar.list(uri_fanout2_bar, block=True)) == 2, 2 + ) + + # make sure the previously saved confirm is not lost + consumer_foo.open(uri_fanout2_foo, flags=["read"], succeed=True) + consumer_foo.wait_push_event() + assert wait_until( + lambda: len(consumer_foo.list(uri_fanout2_foo, block=True)) == 1, 2 + ) + + ensureMessageAtStorageLayer(cluster, uri_fanout2, partition=3, num_msgs=2) + ensureMessageAtStorageLayer(cluster, uri_priority2, partition=2, num_msgs=2)