From cbd6a98e8845e0c58290ff7fa2686531dda2fe60 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 4 Feb 2025 20:15:01 -0500 Subject: [PATCH] race between restoreState and onOpenQueueResponse Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 146 ++++++++---------- 1 file changed, 67 insertions(+), 79 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index f782b9dac..f6d39443c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -1112,12 +1112,41 @@ void ClusterQueueHelper::onOpenQueueResponse( << requestContext->response() << ", for request: " << requestContext->request(); - bool failure = true; + const bmqt::GenericResult::Enum mainCode = requestContext->result(); + + QueueContext& qcontext = *context->queueContext(); + QueueLiveState& qinfo = qcontext.d_liveQInfo; + BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req = + requestContext->request().choice().openQueue(); + StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId( + context->d_upstreamSubQueueId); + + BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) == + subStreamIt->appId()); + BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >= + req.handleParameters().readCount()); + + if (mainCode == bmqt::GenericResult::e_SUCCESS) { + BSLS_ASSERT_SAFE( + requestContext->response().choice().isOpenQueueResponseValue()); + + // Received a success openQueue, proceed with the next step. + if (createQueue( + context, + requestContext->response().choice().openQueueResponse(), + responder)) { + // Queue instance was successfully created at this node. Mark its + // substream's status as 'opened'. + // This flag will be used to determine if self node needs to issue + // a reopen-queue request for this substream upon failover (ie, + // restore state op). + subStreamIt->value().d_state = SubQueueContext::k_OPEN; + } + return; // RETURN + } + bool retry = false; // retry immediately mqbnet::ClusterNode* otherThan = 0; // retry if the upstream is new - bool pushBack = false; // buffer as pending - - const bmqt::GenericResult::Enum mainCode = requestContext->result(); if (mainCode == bmqt::GenericResult::e_CANCELED) { const mqbi::ClusterErrorCode::Enum subCode = @@ -1130,9 +1159,9 @@ void ClusterQueueHelper::onOpenQueueResponse( // Request was canceled due to lost of the active (in proxy), or // node down (in cluster member) before receiving a response. // Open-queue request should be retried (in 'restoreState'). - // This code relies on the order: first 'CancelAllRequests', then - // 'restoreState'. - pushBack = true; + // This code cannot rely on the order of 'restoreState' and + // 'onOpenQueueResponse'. + retry = true; } } else if (mainCode == bmqt::GenericResult::e_REFUSED) { @@ -1169,91 +1198,50 @@ void ClusterQueueHelper::onOpenQueueResponse( retry = true; } } - else if (mainCode != bmqt::GenericResult::e_SUCCESS) { + else { // For any other case of failure, no need to retry. Callback will be // invoked later with error status. BSLS_ASSERT_SAFE(requestContext->response().choice().isStatusValue()); } - else { - failure = false; - } - QueueContext& qcontext = *context->queueContext(); - QueueLiveState& qinfo = qcontext.d_liveQInfo; - BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req = - requestContext->request().choice().openQueue(); - StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId( - context->d_upstreamSubQueueId); - - BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) == - subStreamIt->appId()); - BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >= - req.handleParameters().readCount()); - - if (failure) { - // Rollback _upstream_ view on that particular subQueueId here due to - // open queue failure response from upstream. This is done even if - // 'retry=true', because if so, 'sendOpenQueueRequest' is invoked - // again, which will update the view again after sending the request. + // Rollback _upstream_ view on that particular subQueueId here due to + // open queue failure response from upstream. This is done even if + // 'retry=true', because if so, 'sendOpenQueueRequest' is invoked + // again, which will update the view again after sending the request. - bmqp::QueueUtil::subtractHandleParameters( - &subStreamIt->value().d_parameters, - context->d_handleParameters); - - if (d_cluster_p->isStopping() || (!retry && !pushBack)) { - context->d_callback(requestContext->response().choice().status(), - 0, - bmqp_ctrlmsg::OpenQueueResponse(), - mqbi::Cluster::OpenQueueConfirmationCookie()); - - return; // RETURN - } - - BSLS_ASSERT_SAFE(isQueueAssigned(qcontext)); + bmqp::QueueUtil::subtractHandleParameters( + &subStreamIt->value().d_parameters, + context->d_handleParameters); - if (retry) { - // We can't just put back the context and 'wait' for a partition - // primary assignment because it is possible the primary assignment - // already came before the peer's response; therefore, we just - // retry with the currently known active primary, if any, or put - // back to the pending contexts otherwise. Note that current - // primary could be self, that's why we call - // 'processOpenQueueRequest' instead of 'sendOpenQueueRequest' - // below. + if (d_cluster_p->isStopping() || !retry) { + context->d_callback(requestContext->response().choice().status(), + 0, + bmqp_ctrlmsg::OpenQueueResponse(), + mqbi::Cluster::OpenQueueConfirmationCookie()); - if (isQueuePrimaryAvailable(qcontext, otherThan)) { - processOpenQueueRequest(context); - } - else { - pushBack = true; - } - } + return; // RETURN + } - if (pushBack) { - BALL_LOG_INFO << d_cluster_p->description() - << ": buffering open queue request for " - << qcontext.uri(); + BSLS_ASSERT_SAFE(isQueueAssigned(qcontext)); - qcontext.d_liveQInfo.d_pending.push_back(context); - } + // We can't just put back the context and 'wait' for a partition + // primary assignment because it is possible the primary assignment + // already came before the peer's response; therefore, we just + // retry with the currently known active primary, if any, or put + // back to the pending contexts otherwise. Note that current + // primary could be self, that's why we call + // 'processOpenQueueRequest' instead of 'sendOpenQueueRequest' + // below. - return; // RETURN + if (isQueuePrimaryAvailable(qcontext, otherThan)) { + processOpenQueueRequest(context); } + else { + BALL_LOG_INFO << d_cluster_p->description() + << ": buffering open queue request for " + << qcontext.uri(); - BSLS_ASSERT_SAFE(!retry); - BSLS_ASSERT_SAFE( - requestContext->response().choice().isOpenQueueResponseValue()); - - // Received a success openQueue, proceed with the next step. - if (createQueue(context, - requestContext->response().choice().openQueueResponse(), - responder)) { - // Queue instance was successfully created at this node. Mark its - // substream's status as 'opened'. - // This flag will be used to determine if self node needs to issue a - // reopen-queue request for this substream upon failover (ie, restore - // state op). - subStreamIt->value().d_state = SubQueueContext::k_OPEN; + qcontext.d_liveQInfo.d_pending.push_back(context); } }