Skip to content

Commit

Permalink
race between restoreState and onOpenQueueResponse (#598)
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Feb 10, 2025
1 parent fa69fcb commit 4cc3adc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 90 deletions.
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ void ClusterProxy::onActiveNodeDown(const mqbnet::ClusterNode* node)
return; // RETURN
}

// Update the cluster state with the info. Active node lock must *not* be
// held. Reuse the existing term since this is not a 'new active node'
// event.

d_clusterData.electorInfo().setElectorInfo(
mqbnet::ElectorState::e_DORMANT,
d_clusterData.electorInfo().electorTerm(),
0, // Leader (or 'active node') pointer
mqbc::ElectorInfoLeaderStatus::e_UNDEFINED);

// Cancel all requests with the 'CANCELED' category and the 'e_ACTIVE_LOST'
// code; and the callback will simply re-issue those requests; which then
// will be added to the pending request list.
Expand All @@ -434,16 +444,6 @@ void ClusterProxy::onActiveNodeDown(const mqbnet::ClusterNode* node)
failure.message() = "Lost connection with active node";

d_clusterData.requestManager().cancelAllRequests(response, node->nodeId());

// Update the cluster state with the info. Active node lock must *not* be
// held. Reuse the existing term since this is not a 'new active node'
// event.

d_clusterData.electorInfo().setElectorInfo(
mqbnet::ElectorState::e_DORMANT,
d_clusterData.electorInfo().electorTerm(),
0, // Leader (or 'active node') pointer
mqbc::ElectorInfoLeaderStatus::e_UNDEFINED);
}

// PRIVATE MANIPULATORS
Expand Down
146 changes: 67 additions & 79 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,12 +1112,41 @@ void ClusterQueueHelper::onOpenQueueResponse(
<< requestContext->response()
<< ", for request: " << requestContext->request();

bool failure = true;
const bmqt::GenericResult::Enum mainCode = requestContext->result();

QueueContext& qcontext = *context->queueContext();
QueueLiveState& qinfo = qcontext.d_liveQInfo;
BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
requestContext->request().choice().openQueue();
StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId(
context->d_upstreamSubQueueId);

BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) ==
subStreamIt->appId());
BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >=
req.handleParameters().readCount());

if (mainCode == bmqt::GenericResult::e_SUCCESS) {
BSLS_ASSERT_SAFE(
requestContext->response().choice().isOpenQueueResponseValue());

// Received a success openQueue, proceed with the next step.
if (createQueue(
context,
requestContext->response().choice().openQueueResponse(),
responder)) {
// Queue instance was successfully created at this node. Mark its
// substream's status as 'opened'.
// This flag will be used to determine if self node needs to issue
// a reopen-queue request for this substream upon failover (ie,
// restore state op).
subStreamIt->value().d_state = SubQueueContext::k_OPEN;
}
return; // RETURN
}

bool retry = false; // retry immediately
mqbnet::ClusterNode* otherThan = 0; // retry if the upstream is new
bool pushBack = false; // buffer as pending

const bmqt::GenericResult::Enum mainCode = requestContext->result();

if (mainCode == bmqt::GenericResult::e_CANCELED) {
const mqbi::ClusterErrorCode::Enum subCode =
Expand All @@ -1130,9 +1159,9 @@ void ClusterQueueHelper::onOpenQueueResponse(
// Request was canceled due to lost of the active (in proxy), or
// node down (in cluster member) before receiving a response.
// Open-queue request should be retried (in 'restoreState').
// This code relies on the order: first 'CancelAllRequests', then
// 'restoreState'.
pushBack = true;
// This code cannot rely on the order of 'restoreState' and
// 'onOpenQueueResponse'.
retry = true;
}
}
else if (mainCode == bmqt::GenericResult::e_REFUSED) {
Expand Down Expand Up @@ -1169,91 +1198,50 @@ void ClusterQueueHelper::onOpenQueueResponse(
retry = true;
}
}
else if (mainCode != bmqt::GenericResult::e_SUCCESS) {
else {
// For any other case of failure, no need to retry. Callback will be
// invoked later with error status.
BSLS_ASSERT_SAFE(requestContext->response().choice().isStatusValue());
}
else {
failure = false;
}

QueueContext& qcontext = *context->queueContext();
QueueLiveState& qinfo = qcontext.d_liveQInfo;
BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
requestContext->request().choice().openQueue();
StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId(
context->d_upstreamSubQueueId);

BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) ==
subStreamIt->appId());
BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >=
req.handleParameters().readCount());

if (failure) {
// Rollback _upstream_ view on that particular subQueueId here due to
// open queue failure response from upstream. This is done even if
// 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
// again, which will update the view again after sending the request.
// Rollback _upstream_ view on that particular subQueueId here due to
// open queue failure response from upstream. This is done even if
// 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
// again, which will update the view again after sending the request.

bmqp::QueueUtil::subtractHandleParameters(
&subStreamIt->value().d_parameters,
context->d_handleParameters);

if (d_cluster_p->isStopping() || (!retry && !pushBack)) {
context->d_callback(requestContext->response().choice().status(),
0,
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());

return; // RETURN
}

BSLS_ASSERT_SAFE(isQueueAssigned(qcontext));
bmqp::QueueUtil::subtractHandleParameters(
&subStreamIt->value().d_parameters,
context->d_handleParameters);

if (retry) {
// We can't just put back the context and 'wait' for a partition
// primary assignment because it is possible the primary assignment
// already came before the peer's response; therefore, we just
// retry with the currently known active primary, if any, or put
// back to the pending contexts otherwise. Note that current
// primary could be self, that's why we call
// 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
// below.
if (d_cluster_p->isStopping() || !retry) {
context->d_callback(requestContext->response().choice().status(),
0,
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());

if (isQueuePrimaryAvailable(qcontext, otherThan)) {
processOpenQueueRequest(context);
}
else {
pushBack = true;
}
}
return; // RETURN
}

if (pushBack) {
BALL_LOG_INFO << d_cluster_p->description()
<< ": buffering open queue request for "
<< qcontext.uri();
BSLS_ASSERT_SAFE(isQueueAssigned(qcontext));

qcontext.d_liveQInfo.d_pending.push_back(context);
}
// We can't just put back the context and 'wait' for a partition
// primary assignment because it is possible the primary assignment
// already came before the peer's response; therefore, we just
// retry with the currently known active primary, if any, or put
// back to the pending contexts otherwise. Note that current
// primary could be self, that's why we call
// 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
// below.

return; // RETURN
if (isQueuePrimaryAvailable(qcontext, otherThan)) {
processOpenQueueRequest(context);
}
else {
BALL_LOG_INFO << d_cluster_p->description()
<< ": buffering open queue request for "
<< qcontext.uri();

BSLS_ASSERT_SAFE(!retry);
BSLS_ASSERT_SAFE(
requestContext->response().choice().isOpenQueueResponseValue());

// Received a success openQueue, proceed with the next step.
if (createQueue(context,
requestContext->response().choice().openQueueResponse(),
responder)) {
// Queue instance was successfully created at this node. Mark its
// substream's status as 'opened'.
// This flag will be used to determine if self node needs to issue a
// reopen-queue request for this substream upon failover (ie, restore
// state op).
subStreamIt->value().d_state = SubQueueContext::k_OPEN;
qcontext.d_liveQInfo.d_pending.push_back(context);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,10 @@ inline bool ClusterQueueHelper::isQueuePrimaryAvailable(
return queueContext.d_liveQInfo.d_id !=
bmqp::QueueId::k_UNASSIGNED_QUEUE_ID &&
d_clusterData_p->electorInfo().leaderNode() != 0 &&
d_clusterData_p->electorInfo().leaderNode() != otherThan;
d_clusterData_p->electorInfo().leaderNode() != otherThan &&
d_clusterData_p->electorInfo().electorState() ==
mqbnet::ElectorState::e_LEADER;

// RETURN
}

Expand Down

0 comments on commit 4cc3adc

Please sign in to comment.