Skip to content

Commit

Permalink
getting rid of queue creation/deletion cbs
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored and emelialei88 committed Feb 6, 2025
1 parent d9f8b65 commit c377025
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 383 deletions.
108 changes: 44 additions & 64 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4036,9 +4036,6 @@ void ClusterQueueHelper::onQueueAssigned(
QueueContextSp queueContext;
QueueContextMapIter queueContextIt = d_queues.find(info->uri());

mqbc::ClusterState::DomainState& domainState =
*d_clusterState_p->domainStates().at(info->uri().qualifiedDomain());

if (queueContextIt != d_queues.end()) {
// We already have a queueContext created for that queue
queueContext = queueContextIt->second;
Expand Down Expand Up @@ -4110,6 +4107,8 @@ void ClusterQueueHelper::onQueueAssigned(

d_queues[info->uri()] = queueContext;
}
mqbc::ClusterState::DomainState& domainState =
*d_clusterState_p->domainStates().at(info->uri().qualifiedDomain());

domainState.adjustQueueCount(1);

Expand All @@ -4120,30 +4119,25 @@ void ClusterQueueHelper::onQueueAssigned(
BALL_LOG_INFO << d_cluster_p->description()
<< ": Assigned queue: " << *info;

// Note: In non-CSL mode, the queue creation callback is instead invoked at
// replica nodes when they receive a queue creation record from the primary
// in the partition stream.
if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(info->partitionId())) {
// This is a replica node

// Note: It's possible that the queue has already been registered
// in the StorageMgr if it was a queue found during storage
// recovery. Therefore, we will allow for duplicate registration
// which will simply result in a no-op.
d_storageManager_p->registerQueueReplica(info->partitionId(),
info->uri(),
info->key(),
domainState.domain(),
true); // allowDuplicate

d_storageManager_p->updateQueueReplica(info->partitionId(),
info->uri(),
info->key(),
info->appInfos(),
domainState.domain(),
true); // allowDuplicate
}
if (!d_clusterState_p->isSelfPrimary(info->partitionId())) {
// This is a replica node

// Note: It's possible that the queue has already been registered
// in the StorageMgr if it was a queue found during storage
// recovery. Therefore, we will allow for duplicate registration
// which will simply result in a no-op.
d_storageManager_p->registerQueueReplica(info->partitionId(),
info->uri(),
info->key(),
domainState.domain(),
true); // allowDuplicate

d_storageManager_p->updateQueueReplica(info->partitionId(),
info->uri(),
info->key(),
info->appInfos(),
domainState.domain(),
true); // allowDuplicate
}

// NOTE: Even if it is not needed to invoke 'onQueueContextAssigned' in the
Expand Down Expand Up @@ -4280,16 +4274,10 @@ void ClusterQueueHelper::onQueueUnassigned(
removeQueueRaw(queueContextIt);
}

// Note: In non-CSL mode, the queue deletion callback is instead
// invoked at nodes when they receive a queue deletion record from the
// primary in the partition stream.

if (d_cluster_p->isCSLModeEnabled()) {
d_storageManager_p->unregisterQueueReplica(info->partitionId(),
info->uri(),
info->key(),
mqbu::StorageKey());
}
d_storageManager_p->unregisterQueueReplica(info->partitionId(),
info->uri(),
info->key(),
mqbu::StorageKey());
}

d_clusterState_p->queueKeys().erase(info->key());
Expand Down Expand Up @@ -4328,35 +4316,27 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
d_storageManager_p->updateQueueReplica(partitionId,
uri,
qiter->second->key(),
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(
partitionId,
uri,
qiter->second->key(),
cit->second);
}
d_storageManager_p->unregisterQueueReplica(partitionId,
uri,
qiter->second->key(),
cit->second);
}
}

Expand Down
131 changes: 2 additions & 129 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,94 +363,6 @@ void StorageManager::shutdownCb(int partitionId, bslmt::Latch* latch)
d_clusterConfig);
}

void StorageManager::queueCreationCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const AppInfos& appIdKeyPairs,
bool isNewQueue)
{
// executed by *QUEUE_DISPATCHER* thread associated with 'partitionId'

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread());

if (d_cluster_p->isCSLModeEnabled()) {
// This callback is removed for CSL mode

return; // RETURN
}

// This routine is executed at replica nodes when they received a queue
// creation record from the primary in the partition stream.

if (isNewQueue) {
mqbc::StorageUtil::registerQueueReplicaDispatched(
status,
&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
d_domainFactory_p,
&d_allocators,
d_clusterData_p->identity().description(),
partitionId,
uri,
queueKey);

if (*status != 0) {
return; // RETURN
}
}

mqbc::StorageUtil::updateQueueReplicaDispatched(
status,
&d_storages[partitionId],
&d_storagesLock,
d_domainFactory_p,
d_clusterData_p->identity().description(),
partitionId,
uri,
queueKey,
appIdKeyPairs);
}

void StorageManager::queueDeletionCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const mqbu::StorageKey& appKey)
{
// executed by *QUEUE_DISPATCHER* thread associated with 'partitionId'

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread());

if (d_cluster_p->isCSLModeEnabled()) {
// This callback is removed for CSL mode

return; // RETURN
}

// This routine is executed at replica nodes when they received a queue
// deletion record from the primary in the partition stream.

mqbc::StorageUtil::unregisterQueueReplicaDispatched(
status,
&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
d_clusterData_p->identity().description(),
partitionId,
uri,
queueKey,
appKey,
d_cluster_p->isCSLModeEnabled());
}

void StorageManager::recoveredQueuesCb(int partitionId,
const QueueKeyInfoMap& queueKeyInfoMap)
{
Expand Down Expand Up @@ -1130,14 +1042,6 @@ void StorageManager::registerQueueReplica(int partitionId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p));

if (!d_cluster_p->isCSLModeEnabled()) {
BALL_LOG_ERROR << "#CSL_MODE_MIX "
<< "StorageManager::registerQueueReplica() should only "
<< "be invoked in CSL mode.";

return; // RETURN
}

// This routine is executed at follower nodes upon commit callback of Queue
// Assignment Advisory from the leader.

Expand Down Expand Up @@ -1174,14 +1078,6 @@ void StorageManager::unregisterQueueReplica(int partitionId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p));

if (!d_cluster_p->isCSLModeEnabled()) {
BALL_LOG_ERROR << "#CSL_MODE_MIX "
<< "StorageManager::unregisterQueueReplica() should "
<< "only be invoked in CSL mode.";

return; // RETURN
}

// This routine is executed at follower nodes upon commit callback of Queue
// Unassigned Advisory or Queue Update Advisory from the leader.

Expand Down Expand Up @@ -1218,14 +1114,6 @@ void StorageManager::updateQueueReplica(int partitionId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p));

if (!d_cluster_p->isCSLModeEnabled()) {
BALL_LOG_ERROR << "#CSL_MODE_MIX "
<< "StorageManager::updateQueueReplica() should only "
<< "be invoked in CSL mode.";

return; // RETURN
}

// This routine is executed at follower nodes upon commit callback of Queue
// Queue Update Advisory from the leader.

Expand Down Expand Up @@ -1402,23 +1290,8 @@ int StorageManager::start(bsl::ostream& errorDescription)
d_replicationFactor,
bdlf::BindUtil::bind(&StorageManager::recoveredQueuesCb,
this,
bdlf::PlaceHolders::_1, // partitionId
bdlf::PlaceHolders::_2), // queueKeyUriMap)
bdlf::BindUtil::bind(&StorageManager::queueCreationCb,
this,
bdlf::PlaceHolders::_1, // status
bdlf::PlaceHolders::_2, // partitionId
bdlf::PlaceHolders::_3, // QueueUri
bdlf::PlaceHolders::_4, // QueueKey
bdlf::PlaceHolders::_5, // AppInfos
bdlf::PlaceHolders::_6), // IsNewQueue)
bdlf::BindUtil::bind(&StorageManager::queueDeletionCb,
this,
bdlf::PlaceHolders::_1, // status
bdlf::PlaceHolders::_2, // partitionId
bdlf::PlaceHolders::_3, // QueueUri
bdlf::PlaceHolders::_4, // QueueKey
bdlf::PlaceHolders::_5)); // AppKey
bdlf::PlaceHolders::_1, // partitionId
bdlf::PlaceHolders::_2)); // queueKeyUriMap)

BALL_LOG_INFO_BLOCK
{
Expand Down
13 changes: 0 additions & 13 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,6 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager {
/// associated with `processorId`.
void shutdownCb(int partitionId, bslmt::Latch* latch);

void queueCreationCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const AppInfos& appIdKeyPairs,
bool isNewQueue);

void queueDeletionCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const mqbu::StorageKey& appKey);

/// Callback executed when the partition having the specified
/// `partitionId` has performed recovery and recovered file-backed
/// queues and their virtual storages in the specified
Expand Down
8 changes: 5 additions & 3 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,11 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
}

if (!cluster->isCSLModeEnabled()) {
// Broadcast 'queueAssignmentAdvisory' to all followers
// Do it before 'assignQueue' so that Replicas receive CSL before
// QueueCreationRecord
clusterData->messageTransmitter().broadcastMessage(controlMsg);

// In CSL mode, we assign the queue to ClusterState upon CSL commit
// callback of QueueAssignmentAdvisory, so we don't assign it here.

Expand All @@ -1032,9 +1037,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState,

BALL_LOG_INFO << cluster->description()
<< ": Queue assigned: " << queueAdvisory;

// Broadcast 'queueAssignmentAdvisory' to all followers
clusterData->messageTransmitter().broadcastMessage(controlMsg);
}

return QueueAssignmentResult::k_ASSIGNMENT_OK;
Expand Down
32 changes: 11 additions & 21 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1206,19 +1206,17 @@ bool StorageUtil::validatePartitionSyncEvent(
}

int StorageUtil::assignPartitionDispatcherThreads(
bdlmt::FixedThreadPool* threadPool,
mqbc::ClusterData* clusterData,
const mqbi::Cluster& cluster,
mqbi::Dispatcher* dispatcher,
const mqbcfg::PartitionConfig& config,
FileStores* fileStores,
BlobSpPool* blobSpPool,
bmqma::CountingAllocatorStore* allocators,
bsl::ostream& errorDescription,
int replicationFactor,
const RecoveredQueuesCb& recoveredQueuesCb,
const bdlb::NullableValue<QueueCreationCb>& queueCreationCb,
const bdlb::NullableValue<QueueDeletionCb>& queueDeletionCb)
bdlmt::FixedThreadPool* threadPool,
mqbc::ClusterData* clusterData,
const mqbi::Cluster& cluster,
mqbi::Dispatcher* dispatcher,
const mqbcfg::PartitionConfig& config,
FileStores* fileStores,
BlobSpPool* blobSpPool,
bmqma::CountingAllocatorStore* allocators,
bsl::ostream& errorDescription,
int replicationFactor,
const RecoveredQueuesCb& recoveredQueuesCb)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -1263,14 +1261,6 @@ int StorageUtil::assignPartitionDispatcherThreads(
.setMaxArchivedFileSets(config.maxArchivedFileSets())
.setRecoveredQueuesCb(recoveredQueuesCb);

if (!queueCreationCb.isNull()) {
dsCfg.setQueueCreationCb(queueCreationCb.value());
}

if (!queueDeletionCb.isNull()) {
dsCfg.setQueueDeletionCb(queueDeletionCb.value());
}

// Get named allocator from associated bmqma::CountingAllocatorStore
bslma::Allocator* fileStoreAllocator = allocators->get(
bsl::string("Partition") + bsl::to_string(i));
Expand Down
Loading

0 comments on commit c377025

Please sign in to comment.