Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race between restoreState and onOpenQueueResponse [178106131] #598

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ void ClusterProxy::onActiveNodeDown(const mqbnet::ClusterNode* node)
return; // RETURN
}

// Update the cluster state with the info. Active node lock must *not* be
// held. Reuse the existing term since this is not a 'new active node'
// event.

d_clusterData.electorInfo().setElectorInfo(
mqbnet::ElectorState::e_DORMANT,
d_clusterData.electorInfo().electorTerm(),
0, // Leader (or 'active node') pointer
mqbc::ElectorInfoLeaderStatus::e_UNDEFINED);

// Cancel all requests with the 'CANCELED' category and the 'e_ACTIVE_LOST'
// code; and the callback will simply re-issue those requests; which then
// will be added to the pending request list.
Expand All @@ -434,16 +444,6 @@ void ClusterProxy::onActiveNodeDown(const mqbnet::ClusterNode* node)
failure.message() = "Lost connection with active node";

d_clusterData.requestManager().cancelAllRequests(response, node->nodeId());

// Update the cluster state with the info. Active node lock must *not* be
// held. Reuse the existing term since this is not a 'new active node'
// event.

d_clusterData.electorInfo().setElectorInfo(
mqbnet::ElectorState::e_DORMANT,
d_clusterData.electorInfo().electorTerm(),
0, // Leader (or 'active node') pointer
mqbc::ElectorInfoLeaderStatus::e_UNDEFINED);
}

// PRIVATE MANIPULATORS
Expand Down
146 changes: 67 additions & 79 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,12 +1112,41 @@ void ClusterQueueHelper::onOpenQueueResponse(
<< requestContext->response()
<< ", for request: " << requestContext->request();

bool failure = true;
const bmqt::GenericResult::Enum mainCode = requestContext->result();

QueueContext& qcontext = *context->queueContext();
QueueLiveState& qinfo = qcontext.d_liveQInfo;
BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
requestContext->request().choice().openQueue();
StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId(
context->d_upstreamSubQueueId);

BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) ==
subStreamIt->appId());
BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >=
req.handleParameters().readCount());

if (mainCode == bmqt::GenericResult::e_SUCCESS) {
BSLS_ASSERT_SAFE(
requestContext->response().choice().isOpenQueueResponseValue());

// Received a success openQueue, proceed with the next step.
if (createQueue(
context,
requestContext->response().choice().openQueueResponse(),
responder)) {
// Queue instance was successfully created at this node. Mark its
// substream's status as 'opened'.
// This flag will be used to determine if self node needs to issue
// a reopen-queue request for this substream upon failover (ie,
// restore state op).
subStreamIt->value().d_state = SubQueueContext::k_OPEN;
}
return; // RETURN
}

bool retry = false; // retry immediately
mqbnet::ClusterNode* otherThan = 0; // retry if the upstream is new
bool pushBack = false; // buffer as pending

const bmqt::GenericResult::Enum mainCode = requestContext->result();

if (mainCode == bmqt::GenericResult::e_CANCELED) {
const mqbi::ClusterErrorCode::Enum subCode =
Expand All @@ -1130,9 +1159,9 @@ void ClusterQueueHelper::onOpenQueueResponse(
// Request was canceled due to lost of the active (in proxy), or
// node down (in cluster member) before receiving a response.
// Open-queue request should be retried (in 'restoreState').
// This code relies on the order: first 'CancelAllRequests', then
// 'restoreState'.
pushBack = true;
// This code cannot rely on the order of 'restoreState' and
// 'onOpenQueueResponse'.
retry = true;
}
}
else if (mainCode == bmqt::GenericResult::e_REFUSED) {
Expand Down Expand Up @@ -1169,91 +1198,50 @@ void ClusterQueueHelper::onOpenQueueResponse(
retry = true;
}
}
else if (mainCode != bmqt::GenericResult::e_SUCCESS) {
else {
// For any other case of failure, no need to retry. Callback will be
// invoked later with error status.
BSLS_ASSERT_SAFE(requestContext->response().choice().isStatusValue());
}
else {
failure = false;
}

QueueContext& qcontext = *context->queueContext();
QueueLiveState& qinfo = qcontext.d_liveQInfo;
BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
requestContext->request().choice().openQueue();
StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId(
context->d_upstreamSubQueueId);

BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) ==
subStreamIt->appId());
BSLS_ASSERT_SAFE(subStreamIt->value().d_parameters.readCount() >=
req.handleParameters().readCount());

if (failure) {
// Rollback _upstream_ view on that particular subQueueId here due to
// open queue failure response from upstream. This is done even if
// 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
// again, which will update the view again after sending the request.
// Rollback _upstream_ view on that particular subQueueId here due to
// open queue failure response from upstream. This is done even if
// 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
// again, which will update the view again after sending the request.

bmqp::QueueUtil::subtractHandleParameters(
&subStreamIt->value().d_parameters,
context->d_handleParameters);

if (d_cluster_p->isStopping() || (!retry && !pushBack)) {
context->d_callback(requestContext->response().choice().status(),
0,
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());

return; // RETURN
}

BSLS_ASSERT_SAFE(isQueueAssigned(qcontext));
bmqp::QueueUtil::subtractHandleParameters(
&subStreamIt->value().d_parameters,
context->d_handleParameters);

if (retry) {
// We can't just put back the context and 'wait' for a partition
// primary assignment because it is possible the primary assignment
// already came before the peer's response; therefore, we just
// retry with the currently known active primary, if any, or put
// back to the pending contexts otherwise. Note that current
// primary could be self, that's why we call
// 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
// below.
if (d_cluster_p->isStopping() || !retry) {
context->d_callback(requestContext->response().choice().status(),
0,
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());

if (isQueuePrimaryAvailable(qcontext, otherThan)) {
processOpenQueueRequest(context);
}
else {
pushBack = true;
}
}
return; // RETURN
}

if (pushBack) {
BALL_LOG_INFO << d_cluster_p->description()
<< ": buffering open queue request for "
<< qcontext.uri();
BSLS_ASSERT_SAFE(isQueueAssigned(qcontext));

qcontext.d_liveQInfo.d_pending.push_back(context);
}
// We can't just put back the context and 'wait' for a partition
// primary assignment because it is possible the primary assignment
// already came before the peer's response; therefore, we just
// retry with the currently known active primary, if any, or put
// back to the pending contexts otherwise. Note that current
// primary could be self, that's why we call
// 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
// below.

return; // RETURN
if (isQueuePrimaryAvailable(qcontext, otherThan)) {
processOpenQueueRequest(context);
}
else {
BALL_LOG_INFO << d_cluster_p->description()
<< ": buffering open queue request for "
<< qcontext.uri();

BSLS_ASSERT_SAFE(!retry);
BSLS_ASSERT_SAFE(
requestContext->response().choice().isOpenQueueResponseValue());

// Received a success openQueue, proceed with the next step.
if (createQueue(context,
requestContext->response().choice().openQueueResponse(),
responder)) {
// Queue instance was successfully created at this node. Mark its
// substream's status as 'opened'.
// This flag will be used to determine if self node needs to issue a
// reopen-queue request for this substream upon failover (ie, restore
// state op).
subStreamIt->value().d_state = SubQueueContext::k_OPEN;
qcontext.d_liveQInfo.d_pending.push_back(context);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,10 @@ inline bool ClusterQueueHelper::isQueuePrimaryAvailable(
return queueContext.d_liveQInfo.d_id !=
bmqp::QueueId::k_UNASSIGNED_QUEUE_ID &&
d_clusterData_p->electorInfo().leaderNode() != 0 &&
d_clusterData_p->electorInfo().leaderNode() != otherThan;
d_clusterData_p->electorInfo().leaderNode() != otherThan &&
d_clusterData_p->electorInfo().electorState() ==
mqbnet::ElectorState::e_LEADER;

// RETURN
}

Expand Down
Loading