Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat getting rid of queue creation/deletion cbs #601

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 44 additions & 64 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4036,9 +4036,6 @@ void ClusterQueueHelper::onQueueAssigned(
QueueContextSp queueContext;
QueueContextMapIter queueContextIt = d_queues.find(info->uri());

mqbc::ClusterState::DomainState& domainState =
*d_clusterState_p->domainStates().at(info->uri().qualifiedDomain());

if (queueContextIt != d_queues.end()) {
// We already have a queueContext created for that queue
queueContext = queueContextIt->second;
Expand Down Expand Up @@ -4110,6 +4107,8 @@ void ClusterQueueHelper::onQueueAssigned(

d_queues[info->uri()] = queueContext;
}
mqbc::ClusterState::DomainState& domainState =
*d_clusterState_p->domainStates().at(info->uri().qualifiedDomain());

domainState.adjustQueueCount(1);

Expand All @@ -4120,30 +4119,25 @@ void ClusterQueueHelper::onQueueAssigned(
BALL_LOG_INFO << d_cluster_p->description()
<< ": Assigned queue: " << *info;

// Note: In non-CSL mode, the queue creation callback is instead invoked at
// replica nodes when they receive a queue creation record from the primary
// in the partition stream.
if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(info->partitionId())) {
// This is a replica node

// Note: It's possible that the queue has already been registered
// in the StorageMgr if it was a queue found during storage
// recovery. Therefore, we will allow for duplicate registration
// which will simply result in a no-op.
d_storageManager_p->registerQueueReplica(info->partitionId(),
info->uri(),
info->key(),
domainState.domain(),
true); // allowDuplicate

d_storageManager_p->updateQueueReplica(info->partitionId(),
info->uri(),
info->key(),
info->appInfos(),
domainState.domain(),
true); // allowDuplicate
}
if (!d_clusterState_p->isSelfPrimary(info->partitionId())) {
// This is a replica node

// Note: It's possible that the queue has already been registered
// in the StorageMgr if it was a queue found during storage
// recovery. Therefore, we will allow for duplicate registration
// which will simply result in a no-op.
d_storageManager_p->registerQueueReplica(info->partitionId(),
info->uri(),
info->key(),
domainState.domain(),
true); // allowDuplicate

d_storageManager_p->updateQueueReplica(info->partitionId(),
info->uri(),
info->key(),
info->appInfos(),
domainState.domain(),
true); // allowDuplicate
}

// NOTE: Even if it is not needed to invoke 'onQueueContextAssigned' in the
Expand Down Expand Up @@ -4280,16 +4274,10 @@ void ClusterQueueHelper::onQueueUnassigned(
removeQueueRaw(queueContextIt);
}

// Note: In non-CSL mode, the queue deletion callback is instead
// invoked at nodes when they receive a queue deletion record from the
// primary in the partition stream.

if (d_cluster_p->isCSLModeEnabled()) {
d_storageManager_p->unregisterQueueReplica(info->partitionId(),
info->uri(),
info->key(),
mqbu::StorageKey());
}
d_storageManager_p->unregisterQueueReplica(info->partitionId(),
info->uri(),
info->key(),
mqbu::StorageKey());
}

d_clusterState_p->queueKeys().erase(info->key());
Expand Down Expand Up @@ -4328,35 +4316,27 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
d_storageManager_p->updateQueueReplica(partitionId,
uri,
qiter->second->key(),
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(
partitionId,
uri,
qiter->second->key(),
cit->second);
}
d_storageManager_p->unregisterQueueReplica(partitionId,
uri,
qiter->second->key(),
cit->second);
}
}

Expand Down
131 changes: 2 additions & 129 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,94 +363,6 @@ void StorageManager::shutdownCb(int partitionId, bslmt::Latch* latch)
d_clusterConfig);
}

void StorageManager::queueCreationCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const AppInfos& appIdKeyPairs,
bool isNewQueue)
{
// executed by *QUEUE_DISPATCHER* thread associated with 'partitionId'

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread());

if (d_cluster_p->isCSLModeEnabled()) {
// This callback is removed for CSL mode

return; // RETURN
}

// This routine is executed at replica nodes when they received a queue
// creation record from the primary in the partition stream.

if (isNewQueue) {
mqbc::StorageUtil::registerQueueReplicaDispatched(
status,
&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
d_domainFactory_p,
&d_allocators,
d_clusterData_p->identity().description(),
partitionId,
uri,
queueKey);

if (*status != 0) {
return; // RETURN
}
}

mqbc::StorageUtil::updateQueueReplicaDispatched(
status,
&d_storages[partitionId],
&d_storagesLock,
d_domainFactory_p,
d_clusterData_p->identity().description(),
partitionId,
uri,
queueKey,
appIdKeyPairs);
}

void StorageManager::queueDeletionCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const mqbu::StorageKey& appKey)
{
// executed by *QUEUE_DISPATCHER* thread associated with 'partitionId'

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread());

if (d_cluster_p->isCSLModeEnabled()) {
// This callback is removed for CSL mode

return; // RETURN
}

// This routine is executed at replica nodes when they received a queue
// deletion record from the primary in the partition stream.

mqbc::StorageUtil::unregisterQueueReplicaDispatched(
status,
&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
d_clusterData_p->identity().description(),
partitionId,
uri,
queueKey,
appKey,
d_cluster_p->isCSLModeEnabled());
}

void StorageManager::recoveredQueuesCb(int partitionId,
const QueueKeyInfoMap& queueKeyInfoMap)
{
Expand Down Expand Up @@ -1130,14 +1042,6 @@ void StorageManager::registerQueueReplica(int partitionId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p));

if (!d_cluster_p->isCSLModeEnabled()) {
BALL_LOG_ERROR << "#CSL_MODE_MIX "
<< "StorageManager::registerQueueReplica() should only "
<< "be invoked in CSL mode.";

return; // RETURN
}

// This routine is executed at follower nodes upon commit callback of Queue
// Assignment Advisory from the leader.

Expand Down Expand Up @@ -1174,14 +1078,6 @@ void StorageManager::unregisterQueueReplica(int partitionId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p));

if (!d_cluster_p->isCSLModeEnabled()) {
BALL_LOG_ERROR << "#CSL_MODE_MIX "
<< "StorageManager::unregisterQueueReplica() should "
<< "only be invoked in CSL mode.";

return; // RETURN
}

// This routine is executed at follower nodes upon commit callback of Queue
// Unassigned Advisory or Queue Update Advisory from the leader.

Expand Down Expand Up @@ -1218,14 +1114,6 @@ void StorageManager::updateQueueReplica(int partitionId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p));

if (!d_cluster_p->isCSLModeEnabled()) {
BALL_LOG_ERROR << "#CSL_MODE_MIX "
<< "StorageManager::updateQueueReplica() should only "
<< "be invoked in CSL mode.";

return; // RETURN
}

// This routine is executed at follower nodes upon commit callback of Queue
// Queue Update Advisory from the leader.

Expand Down Expand Up @@ -1402,23 +1290,8 @@ int StorageManager::start(bsl::ostream& errorDescription)
d_replicationFactor,
bdlf::BindUtil::bind(&StorageManager::recoveredQueuesCb,
this,
bdlf::PlaceHolders::_1, // partitionId
bdlf::PlaceHolders::_2), // queueKeyUriMap)
bdlf::BindUtil::bind(&StorageManager::queueCreationCb,
this,
bdlf::PlaceHolders::_1, // status
bdlf::PlaceHolders::_2, // partitionId
bdlf::PlaceHolders::_3, // QueueUri
bdlf::PlaceHolders::_4, // QueueKey
bdlf::PlaceHolders::_5, // AppInfos
bdlf::PlaceHolders::_6), // IsNewQueue)
bdlf::BindUtil::bind(&StorageManager::queueDeletionCb,
this,
bdlf::PlaceHolders::_1, // status
bdlf::PlaceHolders::_2, // partitionId
bdlf::PlaceHolders::_3, // QueueUri
bdlf::PlaceHolders::_4, // QueueKey
bdlf::PlaceHolders::_5)); // AppKey
bdlf::PlaceHolders::_1, // partitionId
bdlf::PlaceHolders::_2)); // queueKeyUriMap)

BALL_LOG_INFO_BLOCK
{
Expand Down
13 changes: 0 additions & 13 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,6 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager {
/// associated with `processorId`.
void shutdownCb(int partitionId, bslmt::Latch* latch);

void queueCreationCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const AppInfos& appIdKeyPairs,
bool isNewQueue);

void queueDeletionCb(int* status,
int partitionId,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
const mqbu::StorageKey& appKey);

/// Callback executed when the partition having the specified
/// `partitionId` has performed recovery and recovered file-backed
/// queues and their virtual storages in the specified
Expand Down
8 changes: 5 additions & 3 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,11 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
}

if (!cluster->isCSLModeEnabled()) {
// Broadcast 'queueAssignmentAdvisory' to all followers
// Do it before 'assignQueue' so that Replicas receive CSL before
// QueueCreationRecord
clusterData->messageTransmitter().broadcastMessage(controlMsg);

// In CSL mode, we assign the queue to ClusterState upon CSL commit
// callback of QueueAssignmentAdvisory, so we don't assign it here.

Expand All @@ -1032,9 +1037,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState,

BALL_LOG_INFO << cluster->description()
<< ": Queue assigned: " << queueAdvisory;

// Broadcast 'queueAssignmentAdvisory' to all followers
clusterData->messageTransmitter().broadcastMessage(controlMsg);
}

return QueueAssignmentResult::k_ASSIGNMENT_OK;
Expand Down
32 changes: 11 additions & 21 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1206,19 +1206,17 @@ bool StorageUtil::validatePartitionSyncEvent(
}

int StorageUtil::assignPartitionDispatcherThreads(
bdlmt::FixedThreadPool* threadPool,
mqbc::ClusterData* clusterData,
const mqbi::Cluster& cluster,
mqbi::Dispatcher* dispatcher,
const mqbcfg::PartitionConfig& config,
FileStores* fileStores,
BlobSpPool* blobSpPool,
bmqma::CountingAllocatorStore* allocators,
bsl::ostream& errorDescription,
int replicationFactor,
const RecoveredQueuesCb& recoveredQueuesCb,
const bdlb::NullableValue<QueueCreationCb>& queueCreationCb,
const bdlb::NullableValue<QueueDeletionCb>& queueDeletionCb)
bdlmt::FixedThreadPool* threadPool,
mqbc::ClusterData* clusterData,
const mqbi::Cluster& cluster,
mqbi::Dispatcher* dispatcher,
const mqbcfg::PartitionConfig& config,
FileStores* fileStores,
BlobSpPool* blobSpPool,
bmqma::CountingAllocatorStore* allocators,
bsl::ostream& errorDescription,
int replicationFactor,
const RecoveredQueuesCb& recoveredQueuesCb)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -1263,14 +1261,6 @@ int StorageUtil::assignPartitionDispatcherThreads(
.setMaxArchivedFileSets(config.maxArchivedFileSets())
.setRecoveredQueuesCb(recoveredQueuesCb);

if (!queueCreationCb.isNull()) {
dsCfg.setQueueCreationCb(queueCreationCb.value());
}

if (!queueDeletionCb.isNull()) {
dsCfg.setQueueDeletionCb(queueDeletionCb.value());
}

// Get named allocator from associated bmqma::CountingAllocatorStore
bslma::Allocator* fileStoreAllocator = allocators->get(
bsl::string("Partition") + bsl::to_string(i));
Expand Down
Loading
Loading