diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index f782b9dacf..c14edfed58 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -4036,9 +4036,6 @@ void ClusterQueueHelper::onQueueAssigned( QueueContextSp queueContext; QueueContextMapIter queueContextIt = d_queues.find(info->uri()); - mqbc::ClusterState::DomainState& domainState = - *d_clusterState_p->domainStates().at(info->uri().qualifiedDomain()); - if (queueContextIt != d_queues.end()) { // We already have a queueContext created for that queue queueContext = queueContextIt->second; @@ -4110,6 +4107,8 @@ void ClusterQueueHelper::onQueueAssigned( d_queues[info->uri()] = queueContext; } + mqbc::ClusterState::DomainState& domainState = + *d_clusterState_p->domainStates().at(info->uri().qualifiedDomain()); domainState.adjustQueueCount(1); @@ -4120,30 +4119,25 @@ void ClusterQueueHelper::onQueueAssigned( BALL_LOG_INFO << d_cluster_p->description() << ": Assigned queue: " << *info; - // Note: In non-CSL mode, the queue creation callback is instead invoked at - // replica nodes when they receive a queue creation record from the primary - // in the partition stream. - if (d_cluster_p->isCSLModeEnabled()) { - if (!d_clusterState_p->isSelfPrimary(info->partitionId())) { - // This is a replica node - - // Note: It's possible that the queue has already been registered - // in the StorageMgr if it was a queue found during storage - // recovery. Therefore, we will allow for duplicate registration - // which will simply result in a no-op. - d_storageManager_p->registerQueueReplica(info->partitionId(), - info->uri(), - info->key(), - domainState.domain(), - true); // allowDuplicate - - d_storageManager_p->updateQueueReplica(info->partitionId(), - info->uri(), - info->key(), - info->appInfos(), - domainState.domain(), - true); // allowDuplicate - } + if (!d_clusterState_p->isSelfPrimary(info->partitionId())) { + // This is a replica node + + // Note: It's possible that the queue has already been registered + // in the StorageMgr if it was a queue found during storage + // recovery. Therefore, we will allow for duplicate registration + // which will simply result in a no-op. + d_storageManager_p->registerQueueReplica(info->partitionId(), + info->uri(), + info->key(), + domainState.domain(), + true); // allowDuplicate + + d_storageManager_p->updateQueueReplica(info->partitionId(), + info->uri(), + info->key(), + info->appInfos(), + domainState.domain(), + true); // allowDuplicate } // NOTE: Even if it is not needed to invoke 'onQueueContextAssigned' in the @@ -4280,16 +4274,10 @@ void ClusterQueueHelper::onQueueUnassigned( removeQueueRaw(queueContextIt); } - // Note: In non-CSL mode, the queue deletion callback is instead - // invoked at nodes when they receive a queue deletion record from the - // primary in the partition stream. - - if (d_cluster_p->isCSLModeEnabled()) { - d_storageManager_p->unregisterQueueReplica(info->partitionId(), - info->uri(), - info->key(), - mqbu::StorageKey()); - } + d_storageManager_p->unregisterQueueReplica(info->partitionId(), + info->uri(), + info->key(), + mqbu::StorageKey()); } d_clusterState_p->queueKeys().erase(info->key()); @@ -4328,35 +4316,27 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, const int partitionId = qiter->second->partitionId(); BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID); - if (d_cluster_p->isCSLModeEnabled()) { + if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { + d_storageManager_p->updateQueueReplica(partitionId, + uri, + qiter->second->key(), + addedAppIds, + d_clusterState_p->domainStates() + .at(uri.qualifiedDomain()) + ->domain()); + } + + for (AppInfosCIter cit = removedAppIds.cbegin(); + cit != removedAppIds.cend(); + ++cit) { if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { - // Note: In non-CSL mode, the queue creation callback is - // invoked at replica nodes when they receive a queue creation + // Note: In non-CSL mode, the queue deletion callback is + // invoked at replica nodes when they receive a queue deletion // record from the primary in the partition stream. - - d_storageManager_p->updateQueueReplica( - partitionId, - uri, - qiter->second->key(), - addedAppIds, - d_clusterState_p->domainStates() - .at(uri.qualifiedDomain()) - ->domain()); - } - - for (AppInfosCIter cit = removedAppIds.cbegin(); - cit != removedAppIds.cend(); - ++cit) { - if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { - // Note: In non-CSL mode, the queue deletion callback is - // invoked at replica nodes when they receive a queue deletion - // record from the primary in the partition stream. - d_storageManager_p->unregisterQueueReplica( - partitionId, - uri, - qiter->second->key(), - cit->second); - } + d_storageManager_p->unregisterQueueReplica(partitionId, + uri, + qiter->second->key(), + cit->second); } } diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 2f8572cfef..24dd85ce9b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -363,94 +363,6 @@ void StorageManager::shutdownCb(int partitionId, bslmt::Latch* latch) d_clusterConfig); } -void StorageManager::queueCreationCb(int* status, - int partitionId, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const AppInfos& appIdKeyPairs, - bool isNewQueue) -{ - // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' - - // PRECONDITIONS - BSLS_ASSERT_SAFE(0 <= partitionId && - partitionId < static_cast(d_fileStores.size())); - BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); - - if (d_cluster_p->isCSLModeEnabled()) { - // This callback is removed for CSL mode - - return; // RETURN - } - - // This routine is executed at replica nodes when they received a queue - // creation record from the primary in the partition stream. - - if (isNewQueue) { - mqbc::StorageUtil::registerQueueReplicaDispatched( - status, - &d_storages[partitionId], - &d_storagesLock, - d_fileStores[partitionId].get(), - d_domainFactory_p, - &d_allocators, - d_clusterData_p->identity().description(), - partitionId, - uri, - queueKey); - - if (*status != 0) { - return; // RETURN - } - } - - mqbc::StorageUtil::updateQueueReplicaDispatched( - status, - &d_storages[partitionId], - &d_storagesLock, - d_domainFactory_p, - d_clusterData_p->identity().description(), - partitionId, - uri, - queueKey, - appIdKeyPairs); -} - -void StorageManager::queueDeletionCb(int* status, - int partitionId, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const mqbu::StorageKey& appKey) -{ - // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' - - // PRECONDITIONS - BSLS_ASSERT_SAFE(0 <= partitionId && - partitionId < static_cast(d_fileStores.size())); - BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); - - if (d_cluster_p->isCSLModeEnabled()) { - // This callback is removed for CSL mode - - return; // RETURN - } - - // This routine is executed at replica nodes when they received a queue - // deletion record from the primary in the partition stream. - - mqbc::StorageUtil::unregisterQueueReplicaDispatched( - status, - &d_storages[partitionId], - &d_storagesLock, - d_fileStores[partitionId].get(), - d_clusterData_p->identity().description(), - partitionId, - uri, - queueKey, - appKey, - d_cluster_p->isCSLModeEnabled()); -} - void StorageManager::recoveredQueuesCb(int partitionId, const QueueKeyInfoMap& queueKeyInfoMap) { @@ -1130,14 +1042,6 @@ void StorageManager::registerQueueReplica(int partitionId, // PRECONDITIONS BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); - if (!d_cluster_p->isCSLModeEnabled()) { - BALL_LOG_ERROR << "#CSL_MODE_MIX " - << "StorageManager::registerQueueReplica() should only " - << "be invoked in CSL mode."; - - return; // RETURN - } - // This routine is executed at follower nodes upon commit callback of Queue // Assignment Advisory from the leader. @@ -1174,14 +1078,6 @@ void StorageManager::unregisterQueueReplica(int partitionId, // PRECONDITIONS BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); - if (!d_cluster_p->isCSLModeEnabled()) { - BALL_LOG_ERROR << "#CSL_MODE_MIX " - << "StorageManager::unregisterQueueReplica() should " - << "only be invoked in CSL mode."; - - return; // RETURN - } - // This routine is executed at follower nodes upon commit callback of Queue // Unassigned Advisory or Queue Update Advisory from the leader. @@ -1218,14 +1114,6 @@ void StorageManager::updateQueueReplica(int partitionId, // PRECONDITIONS BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); - if (!d_cluster_p->isCSLModeEnabled()) { - BALL_LOG_ERROR << "#CSL_MODE_MIX " - << "StorageManager::updateQueueReplica() should only " - << "be invoked in CSL mode."; - - return; // RETURN - } - // This routine is executed at follower nodes upon commit callback of Queue // Queue Update Advisory from the leader. @@ -1402,23 +1290,8 @@ int StorageManager::start(bsl::ostream& errorDescription) d_replicationFactor, bdlf::BindUtil::bind(&StorageManager::recoveredQueuesCb, this, - bdlf::PlaceHolders::_1, // partitionId - bdlf::PlaceHolders::_2), // queueKeyUriMap) - bdlf::BindUtil::bind(&StorageManager::queueCreationCb, - this, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // partitionId - bdlf::PlaceHolders::_3, // QueueUri - bdlf::PlaceHolders::_4, // QueueKey - bdlf::PlaceHolders::_5, // AppInfos - bdlf::PlaceHolders::_6), // IsNewQueue) - bdlf::BindUtil::bind(&StorageManager::queueDeletionCb, - this, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // partitionId - bdlf::PlaceHolders::_3, // QueueUri - bdlf::PlaceHolders::_4, // QueueKey - bdlf::PlaceHolders::_5)); // AppKey + bdlf::PlaceHolders::_1, // partitionId + bdlf::PlaceHolders::_2)); // queueKeyUriMap) BALL_LOG_INFO_BLOCK { diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index 4fec506808..e8187b566e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -324,19 +324,6 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager { /// associated with `processorId`. void shutdownCb(int partitionId, bslmt::Latch* latch); - void queueCreationCb(int* status, - int partitionId, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const AppInfos& appIdKeyPairs, - bool isNewQueue); - - void queueDeletionCb(int* status, - int partitionId, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const mqbu::StorageKey& appKey); - /// Callback executed when the partition having the specified /// `partitionId` has performed recovery and recovered file-backed /// queues and their virtual storages in the specified diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 470a982134..f87de8f228 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -1010,6 +1010,11 @@ ClusterUtil::assignQueue(ClusterState* clusterState, } if (!cluster->isCSLModeEnabled()) { + // Broadcast 'queueAssignmentAdvisory' to all followers + // Do it before 'assignQueue' so that Replicas receive CSL before + // QueueCreationRecord + clusterData->messageTransmitter().broadcastMessage(controlMsg); + // In CSL mode, we assign the queue to ClusterState upon CSL commit // callback of QueueAssignmentAdvisory, so we don't assign it here. @@ -1032,9 +1037,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState, BALL_LOG_INFO << cluster->description() << ": Queue assigned: " << queueAdvisory; - - // Broadcast 'queueAssignmentAdvisory' to all followers - clusterData->messageTransmitter().broadcastMessage(controlMsg); } return QueueAssignmentResult::k_ASSIGNMENT_OK; diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 78b5980b5e..4f04d8083c 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -1206,19 +1206,17 @@ bool StorageUtil::validatePartitionSyncEvent( } int StorageUtil::assignPartitionDispatcherThreads( - bdlmt::FixedThreadPool* threadPool, - mqbc::ClusterData* clusterData, - const mqbi::Cluster& cluster, - mqbi::Dispatcher* dispatcher, - const mqbcfg::PartitionConfig& config, - FileStores* fileStores, - BlobSpPool* blobSpPool, - bmqma::CountingAllocatorStore* allocators, - bsl::ostream& errorDescription, - int replicationFactor, - const RecoveredQueuesCb& recoveredQueuesCb, - const bdlb::NullableValue& queueCreationCb, - const bdlb::NullableValue& queueDeletionCb) + bdlmt::FixedThreadPool* threadPool, + mqbc::ClusterData* clusterData, + const mqbi::Cluster& cluster, + mqbi::Dispatcher* dispatcher, + const mqbcfg::PartitionConfig& config, + FileStores* fileStores, + BlobSpPool* blobSpPool, + bmqma::CountingAllocatorStore* allocators, + bsl::ostream& errorDescription, + int replicationFactor, + const RecoveredQueuesCb& recoveredQueuesCb) { // executed by the cluster *DISPATCHER* thread @@ -1263,14 +1261,6 @@ int StorageUtil::assignPartitionDispatcherThreads( .setMaxArchivedFileSets(config.maxArchivedFileSets()) .setRecoveredQueuesCb(recoveredQueuesCb); - if (!queueCreationCb.isNull()) { - dsCfg.setQueueCreationCb(queueCreationCb.value()); - } - - if (!queueDeletionCb.isNull()) { - dsCfg.setQueueDeletionCb(queueDeletionCb.value()); - } - // Get named allocator from associated bmqma::CountingAllocatorStore bslma::Allocator* fileStoreAllocator = allocators->get( bsl::string("Partition") + bsl::to_string(i)); diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index 105846dc52..e05465974f 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -502,21 +502,17 @@ struct StorageUtil { /// /// THREAD: Executed by the cluster *DISPATCHER* thread. static int assignPartitionDispatcherThreads( - bdlmt::FixedThreadPool* threadPool, - mqbc::ClusterData* clusterData, - const mqbi::Cluster& cluster, - mqbi::Dispatcher* dispatcher, - const mqbcfg::PartitionConfig& config, - FileStores* fileStores, - BlobSpPool* blobSpPool, - bmqma::CountingAllocatorStore* allocators, - bsl::ostream& errorDescription, - int replicationFactor, - const RecoveredQueuesCb& recoveredQueuesCb, - const bdlb::NullableValue& queueCreationCb = - bdlb::NullableValue(), - const bdlb::NullableValue& queueDeletionCb = - bdlb::NullableValue()); + bdlmt::FixedThreadPool* threadPool, + mqbc::ClusterData* clusterData, + const mqbi::Cluster& cluster, + mqbi::Dispatcher* dispatcher, + const mqbcfg::PartitionConfig& config, + FileStores* fileStores, + BlobSpPool* blobSpPool, + bmqma::CountingAllocatorStore* allocators, + bsl::ostream& errorDescription, + int replicationFactor, + const RecoveredQueuesCb& recoveredQueuesCb); /// Clear the specified `primary` of the specified `partitionId` from /// the specified `fs` and `partitionInfo`, using the specified diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 596a13f6f9..eeb4d5084d 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -388,10 +388,6 @@ class DataStoreConfig { bsls::Types::Uint64 d_maxQlistFileSize; - QueueCreationCb d_queueCreationCb; - - QueueDeletionCb d_queueDeletionCb; - RecoveredQueuesCb d_recoveredQueuesCb; int d_maxArchivedFileSets; @@ -413,8 +409,6 @@ class DataStoreConfig { DataStoreConfig& setMaxDataFileSize(bsls::Types::Uint64 value); DataStoreConfig& setMaxJournalFileSize(bsls::Types::Uint64 value); DataStoreConfig& setMaxQlistFileSize(bsls::Types::Uint64 value); - DataStoreConfig& setQueueCreationCb(const QueueCreationCb& value); - DataStoreConfig& setQueueDeletionCb(const QueueDeletionCb& value); DataStoreConfig& setRecoveredQueuesCb(const RecoveredQueuesCb& value); /// Set the corresponding member to the specified `value` and return a @@ -434,8 +428,6 @@ class DataStoreConfig { bsls::Types::Uint64 maxDataFileSize() const; bsls::Types::Uint64 maxJournalFileSize() const; bsls::Types::Uint64 maxQlistFileSize() const; - const QueueCreationCb& queueCreationCb() const; - const QueueDeletionCb& queueDeletionCb() const; const RecoveredQueuesCb& recoveredQueuesCb() const; /// Return the value of the corresponding member. @@ -996,20 +988,6 @@ DataStoreConfig::setMaxQlistFileSize(bsls::Types::Uint64 value) return *this; } -inline DataStoreConfig& -DataStoreConfig::setQueueCreationCb(const QueueCreationCb& value) -{ - d_queueCreationCb = value; - return *this; -} - -inline DataStoreConfig& -DataStoreConfig::setQueueDeletionCb(const QueueDeletionCb& value) -{ - d_queueDeletionCb = value; - return *this; -} - inline DataStoreConfig& DataStoreConfig::setRecoveredQueuesCb(const RecoveredQueuesCb& value) { @@ -1084,18 +1062,6 @@ inline bsls::Types::Uint64 DataStoreConfig::maxQlistFileSize() const return d_maxQlistFileSize; } -inline const DataStoreConfig::QueueCreationCb& -DataStoreConfig::queueCreationCb() const -{ - return d_queueCreationCb; -} - -inline const DataStoreConfig::QueueDeletionCb& -DataStoreConfig::queueDeletionCb() const -{ - return d_queueDeletionCb; -} - inline const DataStoreConfig::RecoveredQueuesCb& DataStoreConfig::recoveredQueuesCb() const { diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index fe429263bd..3e9c62cd65 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -4374,22 +4374,6 @@ int FileStore::writeQueueCreationRecord( } } - if (!d_isCSLModeEnabled) { - int status = 0; - BSLS_ASSERT_SAFE(d_config.queueCreationCb()); - d_config.queueCreationCb()(&status, - d_config.partitionId(), - quri, - queueRec->queueKey(), - appIdKeyPairs, - QueueOpType::e_CREATION == - queueRec->type()); - - if (0 != status) { - return 10 * status + rc_QUEUE_CREATION_FAILURE; // RETURN - } - } - StorageMapIter sit = d_storages.find(queueRec->queueKey()); if (sit == d_storages.end()) { if (d_isCSLModeEnabled) { @@ -4560,9 +4544,9 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header, BSLS_ASSERT_SAFE(!queueKey->isNull()); - if (!(d_isCSLModeEnabled && recordType == RecordType::e_QUEUE_OP && + if (!(recordType == RecordType::e_QUEUE_OP && queueOpType == QueueOpType::e_DELETION)) { - // In CSL mode, the FileStore might receive the QueueDeletionRecord + // In ANY mode, the FileStore might receive the QueueDeletionRecord // after the queue storage has already been removed in the CSL // QueueUnassignment commit callback. Therefore, we will relax the // checks below. @@ -4588,10 +4572,9 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header, } if (!appKey->isNull()) { - if (!(d_isCSLModeEnabled && - recordType == RecordType::e_QUEUE_OP && + if (!(recordType == RecordType::e_QUEUE_OP && queueOpType == QueueOpType::e_PURGE)) { - // In CSL mode, if we are unregistering the 'appKey', then + // In ANY mode, if we are unregistering the 'appKey', then // we would have purged the subqueue and removed the // virtual storage for the 'appKey' before receiving this // QueueOp Purge record. @@ -4799,12 +4782,11 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header, rstorage->purge(*appKey); } else { - // In CSL mode, if we are unregistering the 'appKey', then we + // In ANY mode, if we are unregistering the 'appKey', then we // would have purged the subqueue and removed the virtual // storage for the 'appKey' before receiving this QueueOp Purge // record. That why we don't process this queue-purge command // in CSL mode. - BSLS_ASSERT_SAFE(d_isCSLModeEnabled); } rstorage->addQueueOpRecordHandle(handle); } @@ -4814,32 +4796,11 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header, if (appKey->isNull()) { // Entire queue is being deleted. - if (!d_isCSLModeEnabled) { - // In CSL mode, any outstanding QueueOp records are deleted - // upon receiving queue-unassigned advisory (see - // CQH::onQueueUnassigned and - // StorageMgr::unregisterQueueReplica). In non-CSL mode, - // outstanding QueueOp records need to be deleted here. - - const bsls::Types::Int64 numMsgs = rstorage->numMessages( - mqbu::StorageKey::k_NULL_KEY); - if (0 != numMsgs) { - BMQTSK_ALARMLOG_ALARM("REPLICATION") - << partitionDesc() - << "Received QueueOpRecord.DELETION for queue [" - << rstorage->queueUri() << "] which has [" - << numMsgs << "] outstanding messages." - << BMQTSK_ALARMLOG_END; - return rc_QUEUE_DELETION_ERROR; // RETURN - } - - const ReplicatedStorage::RecordHandles& recHandles = - rstorage->queueOpRecordHandles(); - - for (size_t idx = 0; idx < recHandles.size(); ++idx) { - removeRecordRaw(recHandles[idx]); - } - } + // In ANY mode, any outstanding QueueOp records are deleted + // upon receiving queue-unassigned advisory (see + // CQH::onQueueUnassigned and + // StorageMgr::unregisterQueueReplica). In non-CSL mode, + // outstanding QueueOp records need to be deleted here. // Delete the QueueOpRecord.DELETION record written above. // This needs to be done in both CSL and non-CSL modes. @@ -4847,27 +4808,6 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header, } // else: a non-null appKey is specified in a QueueOpRecord.DELETION // record. No need to remove any queueOpRecords. - - if (d_isCSLModeEnabled) { - // No further logic for CSL mode. - return rc_SUCCESS; // RETURN - } - BSLS_ASSERT_SAFE(!d_isFSMWorkflow); - - // Once below callback returns, 'rstorage' will no longer be - // valid. So invoking this callback should be the last thing - // to do in this 'else' snippet. - - int status = 0; - BSLS_ASSERT_SAFE(d_config.queueDeletionCb()); - d_config.queueDeletionCb()(&status, - d_config.partitionId(), - rstorage->queueUri(), - *queueKey, - *appKey); - if (0 != status) { - return rc_QUEUE_DELETION_ERROR; // RETURN - } } } diff --git a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp index 5ebdb1d19d..75ca2b2dc4 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp @@ -101,28 +101,6 @@ void createBlob(bdlbb::BlobBufferFactory* bufferFactory, new (arena) bdlbb::Blob(bufferFactory, allocator); } -void queueCreationCb(int* status, - int partitionId, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey) -{ - static_cast(status); - static_cast(partitionId); - static_cast(uri); - static_cast(queueKey); -} - -void queueDeletionCb(int* status, - int partitionId, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey) -{ - static_cast(status); - static_cast(partitionId); - static_cast(uri); - static_cast(queueKey); -} - void recoveredQueuesCb( int partitionId, const mqbs::DataStoreConfig::QueueKeyInfoMap& queueKeyInfoMap) @@ -234,18 +212,6 @@ struct Tester { .setMaxDataFileSize(d_partitionCfg.maxDataFileSize()) .setMaxJournalFileSize(d_partitionCfg.maxJournalFileSize()) .setMaxQlistFileSize(d_partitionCfg.maxQlistFileSize()) - .setQueueCreationCb( - bdlf::BindUtil::bind(&queueCreationCb, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // partitionId - bdlf::PlaceHolders::_3, // QueueUri - bdlf::PlaceHolders::_4)) // QueueKey - .setQueueDeletionCb( - bdlf::BindUtil::bind(&queueDeletionCb, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // partitionId - bdlf::PlaceHolders::_3, // QueueUri - bdlf::PlaceHolders::_4)) // QueueKey .setRecoveredQueuesCb(bdlf::BindUtil::bind( &recoveredQueuesCb, bdlf::PlaceHolders::_1, // partitionId