From a65e2c2e0c5a7896d629a72a1d4b0e9fbec0ad5e Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:35:31 -0500 Subject: [PATCH] fix: always update CSL on QueueUpdateAdvisory (#581) * fix: always update CSL on QueueUpdateAdvisory Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> * Updating IT Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --------- Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../mqb/mqbblp/mqbblp_clusterstatemanager.cpp | 19 +++--- src/groups/mqb/mqbc/mqbc_clusterutil.cpp | 46 --------------- src/integration-tests/test_appids.py | 59 +++++++++++++++++++ 3 files changed, 70 insertions(+), 54 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 8bfd0380d2..b0d349194f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -70,12 +70,6 @@ void ClusterStateManager::onCommit( BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(advisory.choice().isClusterMessageValue()); - // NOTE: Even when using old workflow, we still apply all advisories to the - // CSL. We just don't invoke the commit callbacks. - if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled()) { - return; // RETURN - } - if (status != mqbc::ClusterStateLedgerCommitStatus::e_SUCCESS) { BALL_LOG_ERROR << d_clusterData_p->identity().description() << ": Failed to commit advisory: " << advisory @@ -83,6 +77,17 @@ void ClusterStateManager::onCommit( return; // RETURN } + const bmqp_ctrlmsg::ClusterMessage& clusterMessage = + advisory.choice().clusterMessage(); + + // NOTE: Even when using old workflow, we still apply all advisories to the + // CSL. We just don't invoke the commit callbacks. + // Make an exception for QueueUpdateAdvisory + if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled() && + !clusterMessage.choice().isQueueUpdateAdvisoryValue()) { + return; // RETURN + } + // Commenting out following 'if' check to fix an assert during node // shutdown. // if ( d_clusterData_p->membership().selfNodeStatus() @@ -94,8 +99,6 @@ void ClusterStateManager::onCommit( << ": Committed advisory: " << advisory << ", with status '" << status << "'"; - const bmqp_ctrlmsg::ClusterMessage& clusterMessage = - advisory.choice().clusterMessage(); mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p); } diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index f6ab06f6e3..470a982134 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -1286,29 +1286,6 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, queueUpdate.addedAppIds().push_back(appIdInfo); queueAdvisory.queueUpdates().push_back(queueUpdate); - - if (!clusterData->cluster().isCSLModeEnabled()) { - // In CSL mode, we update the queue to ClusterState upon CSL - // commit callback of QueueUpdateAdvisory. - - // In non-CSL mode this is the shortcut to call Primary CQH - // instead of waiting for the quorum of acks in the ledger. - - AppInfos addedApps(allocator); - mqbc::ClusterUtil::parseQueueInfo(&addedApps, - queueUpdate.addedAppIds(), - allocator); - - BSLA_MAYBE_UNUSED const int assignRc = - clusterState.updateQueue(queueUpdate.uri(), - queueUpdate.domain(), - addedApps, - AppInfos(allocator)); - BSLS_ASSERT_SAFE(assignRc == 0); - - BALL_LOG_INFO << clusterData->cluster().description() - << ": Queue updated: " << queueAdvisory; - } } } @@ -1437,29 +1414,6 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, return; // RETURN } - - if (!clusterData->cluster().isCSLModeEnabled()) { - // In CSL mode, we update the queue to ClusterState upon CSL - // commit callback of QueueUpdateAdvisory. - - // In non-CSL mode this is the shortcut to call Primary CQH - // instead of waiting for the quorum of acks in the ledger. - - AppInfos removedApps(allocator); - mqbc::ClusterUtil::parseQueueInfo(&removedApps, - queueUpdate.removedAppIds(), - allocator); - - BSLA_MAYBE_UNUSED const int assignRc = - clusterState.updateQueue(queueUpdate.uri(), - queueUpdate.domain(), - AppInfos(allocator), - removedApps); - BSLS_ASSERT_SAFE(assignRc == 0); - - BALL_LOG_INFO << clusterData->cluster().description() - << ": Queue updated: " << queueAdvisory; - } } } diff --git a/src/integration-tests/test_appids.py b/src/integration-tests/test_appids.py index d0e333433b..c9ff23ba6d 100644 --- a/src/integration-tests/test_appids.py +++ b/src/integration-tests/test_appids.py @@ -641,3 +641,62 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster): consumers[app_id].close(f"{tc.URI_FANOUT}?id={app_id}", block=True) == Client.e_SUCCESS ) + + +def test_open_authorize_change_primary(multi_node: Cluster): + """Add an App to Domain config of an existing queue, and then force a + Replica to become new Primary. Start new Consumer. Make sure the Consumer + receives previously posted data. + This is to address the concern with Replica not processing QueueUpdates + before becoming Primary. + """ + leader = multi_node.last_known_leader + proxies = multi_node.proxy_cycle() + + producer = next(proxies).create_client("producer") + producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True) + + all_app_ids = default_app_ids + ["new_app"] + + # --------------------------------------------------------------------- + # Create a consumer + app_id = all_app_ids[0] + consumer = next(proxies).create_client(app_id) + consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True) + + # --------------------------------------------------------------------- + # Authorize 'quux'. + set_app_ids(multi_node, all_app_ids) + + # --------------------------------------------------------------------- + # Post a message. + producer.post(tc.URI_FANOUT, ["msg1"], succeed=True, wait_ack=True) + + # --------------------------------------------------------------------- + # Ensure that all substreams get 2 messages + + leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE) + + assert wait_until( + lambda: len(consumer.list(f"{tc.URI_FANOUT}?id={app_id}", block=True)) == 1, + 3, + ) + + consumer.close(f"{tc.URI_FANOUT}?id={app_id}", block=True, succeed=True) + + leader.check_exit_code = False + leader.kill() + leader.wait() + + # wait for new leader + leader = multi_node.wait_leader() + + consumer = next(proxies).create_client(app_id) + consumer.open(f"{tc.URI_FANOUT}?id=new_app", flags=["read"], succeed=True) + + assert wait_until( + lambda: len(consumer.list(f"{tc.URI_FANOUT}?id=new_app", block=True)) == 1, + 3, + ) + + consumer.close(f"{tc.URI_FANOUT}?id=new_app", block=True, succeed=True)