@@ -1112,12 +1112,41 @@ void ClusterQueueHelper::onOpenQueueResponse(
1112
1112
<< requestContext->response ()
1113
1113
<< " , for request: " << requestContext->request ();
1114
1114
1115
- bool failure = true ;
1115
+ const bmqt::GenericResult::Enum mainCode = requestContext->result ();
1116
+
1117
+ QueueContext& qcontext = *context->queueContext ();
1118
+ QueueLiveState& qinfo = qcontext.d_liveQInfo ;
1119
+ BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
1120
+ requestContext->request ().choice ().openQueue ();
1121
+ StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds .findBySubId (
1122
+ context->d_upstreamSubQueueId );
1123
+
1124
+ BSLS_ASSERT_SAFE (bmqp::QueueUtil::extractAppId (req.handleParameters ()) ==
1125
+ subStreamIt->appId ());
1126
+ BSLS_ASSERT_SAFE (subStreamIt->value ().d_parameters .readCount () >=
1127
+ req.handleParameters ().readCount ());
1128
+
1129
+ if (mainCode == bmqt::GenericResult::e_SUCCESS) {
1130
+ BSLS_ASSERT_SAFE (
1131
+ requestContext->response ().choice ().isOpenQueueResponseValue ());
1132
+
1133
+ // Received a success openQueue, proceed with the next step.
1134
+ if (createQueue (
1135
+ context,
1136
+ requestContext->response ().choice ().openQueueResponse (),
1137
+ responder)) {
1138
+ // Queue instance was successfully created at this node. Mark its
1139
+ // substream's status as 'opened'.
1140
+ // This flag will be used to determine if self node needs to issue
1141
+ // a reopen-queue request for this substream upon failover (ie,
1142
+ // restore state op).
1143
+ subStreamIt->value ().d_state = SubQueueContext::k_OPEN;
1144
+ }
1145
+ return ; // RETURN
1146
+ }
1147
+
1116
1148
bool retry = false ; // retry immediately
1117
1149
mqbnet::ClusterNode* otherThan = 0 ; // retry if the upstream is new
1118
- bool pushBack = false ; // buffer as pending
1119
-
1120
- const bmqt::GenericResult::Enum mainCode = requestContext->result ();
1121
1150
1122
1151
if (mainCode == bmqt::GenericResult::e_CANCELED) {
1123
1152
const mqbi::ClusterErrorCode::Enum subCode =
@@ -1130,9 +1159,9 @@ void ClusterQueueHelper::onOpenQueueResponse(
1130
1159
// Request was canceled due to lost of the active (in proxy), or
1131
1160
// node down (in cluster member) before receiving a response.
1132
1161
// Open-queue request should be retried (in 'restoreState').
1133
- // This code relies on the order: first 'CancelAllRequests', then
1134
- // 'restoreState '.
1135
- pushBack = true ;
1162
+ // This code cannot rely on the order of 'restoreState' and
1163
+ // 'onOpenQueueResponse '.
1164
+ retry = true ;
1136
1165
}
1137
1166
}
1138
1167
else if (mainCode == bmqt::GenericResult::e_REFUSED) {
@@ -1169,91 +1198,50 @@ void ClusterQueueHelper::onOpenQueueResponse(
1169
1198
retry = true ;
1170
1199
}
1171
1200
}
1172
- else if (mainCode != bmqt::GenericResult::e_SUCCESS) {
1201
+ else {
1173
1202
// For any other case of failure, no need to retry. Callback will be
1174
1203
// invoked later with error status.
1175
1204
BSLS_ASSERT_SAFE (requestContext->response ().choice ().isStatusValue ());
1176
1205
}
1177
- else {
1178
- failure = false ;
1179
- }
1180
1206
1181
- QueueContext& qcontext = *context->queueContext ();
1182
- QueueLiveState& qinfo = qcontext.d_liveQInfo ;
1183
- BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req =
1184
- requestContext->request ().choice ().openQueue ();
1185
- StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds .findBySubId (
1186
- context->d_upstreamSubQueueId );
1187
-
1188
- BSLS_ASSERT_SAFE (bmqp::QueueUtil::extractAppId (req.handleParameters ()) ==
1189
- subStreamIt->appId ());
1190
- BSLS_ASSERT_SAFE (subStreamIt->value ().d_parameters .readCount () >=
1191
- req.handleParameters ().readCount ());
1192
-
1193
- if (failure) {
1194
- // Rollback _upstream_ view on that particular subQueueId here due to
1195
- // open queue failure response from upstream. This is done even if
1196
- // 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
1197
- // again, which will update the view again after sending the request.
1207
+ // Rollback _upstream_ view on that particular subQueueId here due to
1208
+ // open queue failure response from upstream. This is done even if
1209
+ // 'retry=true', because if so, 'sendOpenQueueRequest' is invoked
1210
+ // again, which will update the view again after sending the request.
1198
1211
1199
- bmqp::QueueUtil::subtractHandleParameters (
1200
- &subStreamIt->value ().d_parameters ,
1201
- context->d_handleParameters );
1202
-
1203
- if (d_cluster_p->isStopping () || (!retry && !pushBack)) {
1204
- context->d_callback (requestContext->response ().choice ().status (),
1205
- 0 ,
1206
- bmqp_ctrlmsg::OpenQueueResponse (),
1207
- mqbi::Cluster::OpenQueueConfirmationCookie ());
1208
-
1209
- return ; // RETURN
1210
- }
1211
-
1212
- BSLS_ASSERT_SAFE (isQueueAssigned (qcontext));
1212
+ bmqp::QueueUtil::subtractHandleParameters (
1213
+ &subStreamIt->value ().d_parameters ,
1214
+ context->d_handleParameters );
1213
1215
1214
- if (retry) {
1215
- // We can't just put back the context and 'wait' for a partition
1216
- // primary assignment because it is possible the primary assignment
1217
- // already came before the peer's response; therefore, we just
1218
- // retry with the currently known active primary, if any, or put
1219
- // back to the pending contexts otherwise. Note that current
1220
- // primary could be self, that's why we call
1221
- // 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
1222
- // below.
1216
+ if (d_cluster_p->isStopping () || !retry) {
1217
+ context->d_callback (requestContext->response ().choice ().status (),
1218
+ 0 ,
1219
+ bmqp_ctrlmsg::OpenQueueResponse (),
1220
+ mqbi::Cluster::OpenQueueConfirmationCookie ());
1223
1221
1224
- if (isQueuePrimaryAvailable (qcontext, otherThan)) {
1225
- processOpenQueueRequest (context);
1226
- }
1227
- else {
1228
- pushBack = true ;
1229
- }
1230
- }
1222
+ return ; // RETURN
1223
+ }
1231
1224
1232
- if (pushBack) {
1233
- BALL_LOG_INFO << d_cluster_p->description ()
1234
- << " : buffering open queue request for "
1235
- << qcontext.uri ();
1225
+ BSLS_ASSERT_SAFE (isQueueAssigned (qcontext));
1236
1226
1237
- qcontext.d_liveQInfo .d_pending .push_back (context);
1238
- }
1227
+ // We can't just put back the context and 'wait' for a partition
1228
+ // primary assignment because it is possible the primary assignment
1229
+ // already came before the peer's response; therefore, we just
1230
+ // retry with the currently known active primary, if any, or put
1231
+ // back to the pending contexts otherwise. Note that current
1232
+ // primary could be self, that's why we call
1233
+ // 'processOpenQueueRequest' instead of 'sendOpenQueueRequest'
1234
+ // below.
1239
1235
1240
- return ; // RETURN
1236
+ if (isQueuePrimaryAvailable (qcontext, otherThan)) {
1237
+ processOpenQueueRequest (context);
1241
1238
}
1239
+ else {
1240
+ BALL_LOG_INFO << d_cluster_p->description ()
1241
+ << " : buffering open queue request for "
1242
+ << qcontext.uri ();
1242
1243
1243
- BSLS_ASSERT_SAFE (!retry);
1244
- BSLS_ASSERT_SAFE (
1245
- requestContext->response ().choice ().isOpenQueueResponseValue ());
1246
-
1247
- // Received a success openQueue, proceed with the next step.
1248
- if (createQueue (context,
1249
- requestContext->response ().choice ().openQueueResponse (),
1250
- responder)) {
1251
- // Queue instance was successfully created at this node. Mark its
1252
- // substream's status as 'opened'.
1253
- // This flag will be used to determine if self node needs to issue a
1254
- // reopen-queue request for this substream upon failover (ie, restore
1255
- // state op).
1256
- subStreamIt->value ().d_state = SubQueueContext::k_OPEN;
1244
+ qcontext.d_liveQInfo .d_pending .push_back (context);
1257
1245
}
1258
1246
}
1259
1247
0 commit comments