Skip to content

Commit a689fe7

Browse files
author
Dan Lambright
committed
Optimize resolver collection performance in CommitProxyServer
1 parent 8341fe6 commit a689fe7

File tree

1 file changed

+53
-20
lines changed

1 file changed

+53
-20
lines changed

fdbserver/CommitProxyServer.actor.cpp

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -154,18 +154,24 @@ struct ResolutionRequestBuilder {
154154
for (int idx = 0; idx < trIn.read_conflict_ranges.size(); ++idx) {
155155
const auto& r = trIn.read_conflict_ranges[idx];
156156
auto ranges = self->keyResolvers.intersectingRanges(r);
157-
std::set<int> resolvers;
157+
std::vector<int> resolvers;
158+
resolvers.reserve(self->resolvers.size());
158159
for (auto& ir : ranges) {
159160
auto& version_resolver = ir.value();
160161
for (int i = version_resolver.size() - 1; i >= 0; i--) {
161-
resolvers.insert(version_resolver[i].second);
162+
int resolver_id = version_resolver[i].second;
163+
if (std::find(resolvers.begin(), resolvers.end(), resolver_id) == resolvers.end()) {
164+
resolvers.push_back(resolver_id);
165+
}
162166
if (version_resolver[i].first < trIn.read_snapshot)
163167
break;
164168
}
165169
}
166170
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && systemKeys.intersects(r)) {
167171
for (int k = 0; k < self->resolvers.size(); k++) {
168-
resolvers.insert(k);
172+
if (std::find(resolvers.begin(), resolvers.end(), k) == resolvers.end()) {
173+
resolvers.push_back(k);
174+
}
169175
}
170176
}
171177
ASSERT(resolvers.size());
@@ -181,16 +187,22 @@ struct ResolutionRequestBuilder {
181187
void addWriteConflictRanges(CommitTransactionRef& trIn) {
182188
for (auto& r : trIn.write_conflict_ranges) {
183189
auto ranges = self->keyResolvers.intersectingRanges(r);
184-
std::set<int> resolvers;
190+
std::vector<int> resolvers;
191+
resolvers.reserve(20); // Optimized for small resolver counts
185192
for (auto& ir : ranges) {
186193
auto& version_resolver = ir.value();
187194
if (!version_resolver.empty()) {
188-
resolvers.insert(version_resolver.back().second);
195+
int resolver_id = version_resolver.back().second;
196+
if (std::find(resolvers.begin(), resolvers.end(), resolver_id) == resolvers.end()) {
197+
resolvers.push_back(resolver_id);
198+
}
189199
}
190200
}
191201
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && systemKeys.intersects(r)) {
192202
for (int k = 0; k < self->resolvers.size(); k++) {
193-
resolvers.insert(k);
203+
if (std::find(resolvers.begin(), resolvers.end(), k) == resolvers.end()) {
204+
resolvers.push_back(k);
205+
}
194206
}
195207
}
196208
ASSERT(resolvers.size());
@@ -1619,11 +1631,25 @@ void applyMetadataEffect(CommitBatchContext* self) {
16191631
transactionIndex < self->resolution[0].stateMutations[versionIndex].size() && !self->forceRecovery;
16201632
transactionIndex++) {
16211633
bool committed = true;
1622-
for (int resolver = 0; resolver < self->resolution.size(); resolver++) {
1623-
committed =
1624-
committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
1634+
auto& res = self->resolution;
1635+
auto isCommitted = [&](int i) { return res[i].stateMutations[versionIndex][transactionIndex].committed; };
1636+
// Unroll the loop for the common number of resolvers
1637+
switch (res.size()) {
1638+
case 2:
1639+
committed = isCommitted(0) && isCommitted(1);
1640+
break;
1641+
case 4:
1642+
committed = isCommitted(0) && isCommitted(1) && isCommitted(2) && isCommitted(3);
1643+
break;
1644+
case 8:
1645+
committed = isCommitted(0) && isCommitted(1) && isCommitted(2) && isCommitted(3) && isCommitted(4) &&
1646+
isCommitted(5) && isCommitted(6) && isCommitted(7);
1647+
break;
1648+
default:
1649+
for (int resolver = 0; resolver < res.size(); resolver++) {
1650+
committed = committed && isCommitted(resolver);
1651+
}
16251652
}
1626-
16271653
if (committed && self->pProxyCommitData->getTenantMode() == TenantMode::REQUIRED) {
16281654
auto& tenantIds = self->resolution[0].stateMutations[versionIndex][transactionIndex].tenantIds;
16291655
ASSERT(tenantIds.present());
@@ -1672,7 +1698,8 @@ void applyMetadataEffect(CommitBatchContext* self) {
16721698
}
16731699
}
16741700

1675-
// These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit message
1701+
// These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit
1702+
// message
16761703
auto fcm = self->pProxyCommitData->logAdapter->getCommitMessage();
16771704
self->storeCommits.emplace_back(fcm, self->pProxyCommitData->txnStateStore->commit());
16781705

@@ -1735,8 +1762,8 @@ void determineCommittedTransactions(CommitBatchContext* self) {
17351762
}
17361763
}
17371764

1738-
// This first pass through committed transactions deals with "metadata" effects (modifications of txnStateStore, changes
1739-
// to storage servers' responsibilities)
1765+
// This first pass through committed transactions deals with "metadata" effects (modifications of txnStateStore,
1766+
// changes to storage servers' responsibilities)
17401767
ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self) {
17411768
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
17421769
state std::unordered_set<int64_t> rawAccessTenantIds;
@@ -2025,7 +2052,8 @@ void addAccumulativeChecksumMutations(CommitBatchContext* self) {
20252052
self->toCommit.writeTypedMessage(acsMutation);
20262053
}
20272054
}
2028-
// RangeLock takes effect only when the feature flag is on and database is unlocked and the mutation is not encrypted
2055+
// RangeLock takes effect only when the feature flag is on and database is unlocked and the mutation is not
2056+
// encrypted
20292057
void rejectMutationsForReadLockOnRange(CommitBatchContext* self) {
20302058
ASSERT(self->rangeLockEnabled());
20312059
ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
@@ -2328,6 +2356,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
23282356
self->isMyFirstBatch = !pProxyCommitData->version.get();
23292357
self->previousCoordinators = pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get();
23302358

2359+
// debug mode only?
23312360
assertResolutionStateMutationsSizeConsistent(self->resolution);
23322361

23332362
applyMetadataEffect(self);
@@ -2694,7 +2723,8 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
26942723

26952724
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
26962725
// We have received a reply from the sequencer, so all versions prior to the current version have been
2697-
// made durable and we can consider the current transaction to be committed - advance min commit version now.
2726+
// made durable and we can consider the current transaction to be committed - advance min commit version
2727+
// now.
26982728
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) {
26992729
//TraceEvent("CPAdvanceMinVersion", self->pProxyCommitData->dbgid).detail("PrvVersion", self->prevVersion).detail("CommitVersion", self->commitVersion).detail("Master", self->pProxyCommitData->master.id().toString()).detail("TxSize", self->trs.size());
27002730
debug_advanceMinCommittedVersion(UID(), self->commitVersion);
@@ -2845,15 +2875,16 @@ ACTOR Future<Void> commitBatchImpl(CommitBatchContext* pContext) {
28452875
pContext->stage = INITIALIZE;
28462876
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::Commit;
28472877

2848-
// Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches)
2849-
// so we need to downgrade here
2878+
// Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit
2879+
// batches) so we need to downgrade here
28502880
wait(delay(0, TaskPriority::ProxyCommit));
28512881

28522882
pContext->pProxyCommitData->lastVersionTime = pContext->startTime;
28532883
++pContext->pProxyCommitData->stats.commitBatchIn;
28542884
pContext->setupTraceBatch();
28552885

2856-
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined
2886+
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately
2887+
/// pipelined
28572888
/// and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
28582889
pContext->stage = PRE_RESOLUTION;
28592890
wait(CommitBatch::preresolutionProcessing(pContext));
@@ -2866,12 +2897,14 @@ ACTOR Future<Void> commitBatchImpl(CommitBatchContext* pContext) {
28662897
pContext->stage = RESOLUTION;
28672898
wait(CommitBatch::getResolution(pContext));
28682899

2869-
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but
2900+
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic
2901+
/// but
28702902
/// doesn't need to be)
28712903
pContext->stage = POST_RESOLUTION;
28722904
wait(CommitBatch::postResolution(pContext));
28732905

2874-
/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))
2906+
/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop
2907+
/// above))
28752908
pContext->stage = TRANSACTION_LOGGING;
28762909
wait(CommitBatch::transactionLogging(pContext));
28772910

0 commit comments

Comments
 (0)