Skip to content

Commit

Permalink
race between restoreState and onOpenQueueResponse
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Feb 5, 2025
1 parent 12bc29e commit cbd6a98
Showing 1 changed file with 67 additions and 79 deletions.
146 changes: 67 additions & 79 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,12 +1112,41 @@ void ClusterQueueHelper::onOpenQueueResponse(
<< requestContext->response()
<< ", for request: " << requestContext->request();

bool failure = true;
const bmqt::GenericResult::Enum mainCode = requestContext->result();

QueueContext& qcontext = *context->queueContext();
QueueLiveState& qinfo = qcontext.d_liveQInfo;
BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
requestContext->request().choice().openQueue();
StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId(
context->d_upstreamSubQueueId);

BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) ==
subStreamIt->appId());
BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >=
req.handleParameters().readCount());

if (mainCode == bmqt::GenericResult::e_SUCCESS) {
BSLS_ASSERT_SAFE(
requestContext->response().choice().isOpenQueueResponseValue());

// Received a success openQueue, proceed with the next step.
if (createQueue(
context,
requestContext->response().choice().openQueueResponse(),
responder)) {
// Queue instance was successfully created at this node. Mark its
// substream's status as 'opened'.
// This flag will be used to determine if self node needs to issue
// a reopen-queue request for this substream upon failover (ie,
// restore state op).
subStreamIt->value().d_state = SubQueueContext::k_OPEN;
}
return; // RETURN
}

bool retry = false; // retry immediately
mqbnet::ClusterNode* otherThan = 0; // retry if the upstream is new
bool pushBack = false; // buffer as pending

const bmqt::GenericResult::Enum mainCode = requestContext->result();

if (mainCode == bmqt::GenericResult::e_CANCELED) {
const mqbi::ClusterErrorCode::Enum subCode =
Expand All @@ -1130,9 +1159,9 @@ void ClusterQueueHelper::onOpenQueueResponse(
// Request was canceled due to lost of the active (in proxy), or
// node down (in cluster member) before receiving a response.
// Open-queue request should be retried (in 'restoreState').
// This code relies on the order: first 'CancelAllRequests', then
// 'restoreState'.
pushBack = true;
// This code cannot rely on the order of 'restoreState' and
// 'onOpenQueueResponse'.
retry = true;
}
}
else if (mainCode == bmqt::GenericResult::e_REFUSED) {
Expand Down Expand Up @@ -1169,91 +1198,50 @@ void ClusterQueueHelper::onOpenQueueResponse(
retry = true;
}
}
else if (mainCode != bmqt::GenericResult::e_SUCCESS) {
else {
// For any other case of failure, no need to retry. Callback will be
// invoked later with error status.
BSLS_ASSERT_SAFE(requestContext->response().choice().isStatusValue());
}
else {
failure = false;
}

QueueContext& qcontext = *context->queueContext();
QueueLiveState& qinfo = qcontext.d_liveQInfo;
BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
requestContext->request().choice().openQueue();
StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId(
context->d_upstreamSubQueueId);

BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) ==
subStreamIt->appId());
BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >=
req.handleParameters().readCount());

if (failure) {
// Rollback _upstream_ view on that particular subQueueId here due to
// open queue failure response from upstream. This is done even if
// 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
// again, which will update the view again after sending the request.
// Rollback _upstream_ view on that particular subQueueId here due to
// open queue failure response from upstream. This is done even if
// 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
// again, which will update the view again after sending the request.

bmqp::QueueUtil::subtractHandleParameters(
&subStreamIt->value().d_parameters,
context->d_handleParameters);

if (d_cluster_p->isStopping() || (!retry && !pushBack)) {
context->d_callback(requestContext->response().choice().status(),
0,
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());

return; // RETURN
}

BSLS_ASSERT_SAFE(isQueueAssigned(qcontext));
bmqp::QueueUtil::subtractHandleParameters(
&subStreamIt->value().d_parameters,
context->d_handleParameters);

if (retry) {
// We can't just put back the context and 'wait' for a partition
// primary assignment because it is possible the primary assignment
// already came before the peer's response; therefore, we just
// retry with the currently known active primary, if any, or put
// back to the pending contexts otherwise. Note that current
// primary could be self, that's why we call
// 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
// below.
if (d_cluster_p->isStopping() || !retry) {
context->d_callback(requestContext->response().choice().status(),
0,
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());

if (isQueuePrimaryAvailable(qcontext, otherThan)) {
processOpenQueueRequest(context);
}
else {
pushBack = true;
}
}
return; // RETURN
}

if (pushBack) {
BALL_LOG_INFO << d_cluster_p->description()
<< ": buffering open queue request for "
<< qcontext.uri();
BSLS_ASSERT_SAFE(isQueueAssigned(qcontext));

qcontext.d_liveQInfo.d_pending.push_back(context);
}
// We can't just put back the context and 'wait' for a partition
// primary assignment because it is possible the primary assignment
// already came before the peer's response; therefore, we just
// retry with the currently known active primary, if any, or put
// back to the pending contexts otherwise. Note that current
// primary could be self, that's why we call
// 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
// below.

return; // RETURN
if (isQueuePrimaryAvailable(qcontext, otherThan)) {
processOpenQueueRequest(context);
}
else {
BALL_LOG_INFO << d_cluster_p->description()
<< ": buffering open queue request for "
<< qcontext.uri();

BSLS_ASSERT_SAFE(!retry);
BSLS_ASSERT_SAFE(
requestContext->response().choice().isOpenQueueResponseValue());

// Received a success openQueue, proceed with the next step.
if (createQueue(context,
requestContext->response().choice().openQueueResponse(),
responder)) {
// Queue instance was successfully created at this node. Mark its
// substream's status as 'opened'.
// This flag will be used to determine if self node needs to issue a
// reopen-queue request for this substream upon failover (ie, restore
// state op).
subStreamIt->value().d_state = SubQueueContext::k_OPEN;
qcontext.d_liveQInfo.d_pending.push_back(context);
}
}

Expand Down

0 comments on commit cbd6a98

Please sign in to comment.