From 8e200416564b87b470b22fee0736faec809371e4 Mon Sep 17 00:00:00 2001 From: Yao Xiao Date: Thu, 8 May 2025 18:05:35 -0700 Subject: [PATCH 1/3] rm physical shard --- fdbclient/ServerKnobs.cpp | 3 + fdbclient/include/fdbclient/ServerKnobs.h | 3 + .../include/fdbclient/StorageCheckpoint.h | 4 +- .../include/fdbclient/StorageServerShard.h | 1 + fdbserver/MoveKeys.actor.cpp | 33 +- fdbserver/storageserver.actor.cpp | 515 +++++++++--------- 6 files changed, 298 insertions(+), 261 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 8e3ee343374..23e9668c202 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1119,6 +1119,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( FETCH_SHARD_BUFFER_BYTE_LIMIT, 20e6 ); if( randomize && BUGGIFY ) FETCH_SHARD_BUFFER_BYTE_LIMIT = 1; init( FETCH_SHARD_UPDATES_BYTE_LIMIT, 2500000 ); if( randomize && BUGGIFY ) FETCH_SHARD_UPDATES_BYTE_LIMIT = 100; + // Storage Server with Physical Shard + init( SS_GET_DATA_MOVE_ID, false); if ( isSimulated ) SS_GET_DATA_MOVE_ID = SHARD_ENCODE_LOCATION_METADATA; + //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; init( WAIT_FAILURE_DELAY_LIMIT, 1.0 ); if( randomize && BUGGIFY ) WAIT_FAILURE_DELAY_LIMIT = 5.0; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index b51ae4f407b..3b6f1ebcc35 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1158,6 +1158,9 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl bulkLoadTaskState; // set if the data move is a bulk load data move + Optional> dcTeamIds; // map of dcId to teamId DataMoveMetaData() = default; DataMoveMetaData(UID id, Version version, KeyRange range) : id(id), version(version), priority(0), mode(0) { @@ -206,7 +207,8 @@ struct DataMoveMetaData { template void serialize(Ar& ar) { - serializer(ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState); + serializer( + ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState, dcTeamIds); } }; diff --git a/fdbclient/include/fdbclient/StorageServerShard.h b/fdbclient/include/fdbclient/StorageServerShard.h index cbaf5bb766e..2c0224adcae 100644 --- a/fdbclient/include/fdbclient/StorageServerShard.h +++ b/fdbclient/include/fdbclient/StorageServerShard.h @@ -104,6 +104,7 @@ struct StorageServerShard { uint64_t desiredId; // The intended shard ID. int8_t shardState; Optional moveInShardId; // If present, it is the associated MoveInShardMetaData. + UID teamId; }; #endif diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7bb3462f0db..4f51d19f205 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1712,7 +1712,7 @@ ACTOR static Future startMoveShards(Database occ, serverListEntries.push_back(tr.get(serverListKeyFor(servers[s]))); } std::vector> serverListValues = wait(getAll(serverListEntries)); - + state std::unordered_map> dcServers; for (int s = 0; s < serverListValues.size(); s++) { if (!serverListValues[s].present()) { // Attempt to move onto a server that isn't in serverList (removed or never added to the @@ -1721,6 +1721,13 @@ ACTOR static Future startMoveShards(Database occ, // TODO(psm): Mark the data move as 'deleting'. throw move_to_removed_server(); } + auto si = decodeServerListValue(serverListValues[s].get()); + ASSERT(si.id() == servers[s]); + auto it = dcServers.find(si.locality.describeDcId()); + if (it == dcServers.end()) { + dcServers[si.locality.describeDcId()] = std::vector(); + } + dcServers[si.locality.describeDcId()].push_back(si.id().shortString()); } currentKeys = KeyRangeRef(begin, keys.end); @@ -1733,6 +1740,15 @@ ACTOR static Future startMoveShards(Database occ, state Key endKey = old.back().key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); + if (ranges.front() != currentKeys) { + TraceEvent("MoveShardsPartialRange") + .detail("ExpectedRange", ranges.front()) + .detail("ActualRange", currentKeys) + .detail("DataMoveId", dataMoveId) + .detail("RowLimit", SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT) + .detail("ByteLimit", SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); + } + // Check that enough servers for each shard are in the correct state state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); @@ -1806,6 +1822,7 @@ ACTOR static Future startMoveShards(Database occ, TraceEvent( SevWarn, "StartMoveShardsCancelConflictingDataMove", relocationIntervalId) .detail("Range", rangeIntersectKeys) + .detail("CurrentDataMoveRange", ranges[0]) .detail("DataMoveID", dataMoveId.toString()) .detail("ExistingDataMoveID", destId.toString()); wait(cleanUpDataMove(occ, destId, lock, startMoveKeysLock, keys, ddEnabledState)); @@ -1868,6 +1885,20 @@ ACTOR static Future startMoveShards(Database occ, dataMove.ranges.clear(); dataMove.ranges.push_back(KeyRangeRef(keys.begin, currentKeys.end)); dataMove.dest.insert(servers.begin(), servers.end()); + dataMove.dcTeamIds = std::unordered_map(); + for (auto& [dc, serverIds] : dcServers) { + std::sort(serverIds.begin(), serverIds.end()); + std::string teamId; + for (const auto& serverId : serverIds) { + if (teamId.size() == 0) { + teamId = serverId; + } else { + teamId += "," + serverId; + } + } + // Use the concatenated server ids as the team id to avoid conflicts. + dataMove.dcTeamIds.get()[dc] = teamId; + } } if (currentKeys.end == keys.end) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index be3dbd3add6..3b5d3bf4d25 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -401,6 +401,7 @@ struct AddingShard : NonCopyable { Promise readWrite; DataMovementReason reason; SSBulkLoadMetadata ssBulkLoadMetadata; + std::string teamId; // During the Fetching phase, it saves newer mutations whose version is greater or equal to fetchClient's // fetchVersion, while the shard is still busy catching up with fetchClient. It applies these updates after fetching @@ -473,6 +474,8 @@ class ShardInfo : public ReferenceCounted, NonCopyable { uint64_t changeCounter; uint64_t shardId; uint64_t desiredShardId; + UID dataMoveId; + std::string teamId = "InvalidTeam"; Version version; static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, nullptr, nullptr); } @@ -572,6 +575,16 @@ class ShardInfo : public ReferenceCounted, NonCopyable { (moveInShard && moveInShard->fetchComplete.isSet()); } + bool isFetching() const { + if (adding) { + return !adding->fetchComplete.isSet(); + } + if (moveInShard) { + return !moveInShard->fetchComplete.isSet(); + } + return false; + } + std::string debugDescribeState() const { if (notAssigned()) { return "NotAssigned"; @@ -911,97 +924,6 @@ struct BusiestWriteTagContext { busiestWriteTagEventHolder(makeReference(busiestWriteTagTrackingKey)), lastUpdateTime(-1) {} }; -// A SSPhysicalShard represents a physical shard, it contains a list of keyranges. -class SSPhysicalShard { -public: - SSPhysicalShard(const int64_t id) : id(id) {} - - void addRange(Reference shard); - - // Remove the shard if a shard to the same pointer (ShardInfo*) exists. - void removeRange(Reference shard); - - // Clear all shards overlapping with `range`. - void removeRange(KeyRangeRef range); - - bool supportCheckpoint() const; - - bool hasRange(Reference shard) const; - - int size() const { return ranges.size(); } - // Public function to iterate over the ranges - std::vector>::const_iterator begin() const { return ranges.begin(); } - - std::vector>::const_iterator end() const { return ranges.end(); } - -private: - const int64_t id; - std::vector> ranges; -}; - -void SSPhysicalShard::addRange(Reference shard) { - TraceEvent(SevVerbose, "SSPhysicalShardAddShard") - .detail("ShardID", format("%016llx", this->id)) - .detail("Assigned", !shard->notAssigned()) - .detail("Range", shard->keys); - ASSERT(!shard->notAssigned()); - - removeRange(shard->keys); - - ranges.push_back(shard); -} - -void SSPhysicalShard::removeRange(Reference shard) { - TraceEvent(SevVerbose, "SSPhysicalShardRemoveShard") - .detail("ShardID", format("%016llx", this->id)) - .detail("Assigned", !shard->notAssigned()) - .detail("Range", shard->keys); - - for (int i = 0; i < this->ranges.size(); ++i) { - const auto& r = this->ranges[i]; - if (r.getPtr() == shard.getPtr()) { - this->ranges[i] = this->ranges.back(); - this->ranges.pop_back(); - return; - } - } -} - -void SSPhysicalShard::removeRange(KeyRangeRef range) { - TraceEvent(SevVerbose, "SSPhysicalShardRemoveRange") - .detail("ShardID", format("%016llx", this->id)) - .detail("Range", range); - for (int i = 0; i < this->ranges.size();) { - const auto& r = this->ranges[i]; - if (r->keys.intersects(range)) { - this->ranges[i] = this->ranges.back(); - this->ranges.pop_back(); - } else { - ++i; - } - } -} - -bool SSPhysicalShard::supportCheckpoint() const { - for (const auto& r : this->ranges) { - ASSERT(r->desiredShardId == this->id); - if (r->shardId != this->id) { - return false; - } - } - return true; -} - -bool SSPhysicalShard::hasRange(Reference shard) const { - for (int i = 0; i < this->ranges.size(); ++i) { - if (this->ranges[i].getPtr() == shard.getPtr()) { - return true; - } - } - - return false; -} - struct TenantSSInfo { constexpr static FileIdentifier file_identifier = 3253114; TenantAPI::TenantLockState lockState; @@ -1065,7 +987,8 @@ struct StorageServer : public IStorageMetricsService { public: struct PendingNewShard { - PendingNewShard(uint64_t shardId, KeyRangeRef range) : shardId(format("%016llx", shardId)), range(range) {} + PendingNewShard(uint64_t shardId, UID dataMoveId, KeyRangeRef range, Reference shardInfo) + : shardId(format("%016llx", shardId)), dataMoveId(dataMoveId), range(range), shardInfo(shardInfo) {} std::string toString() const { return fmt::format("PendingNewShard: [ShardID]: {} [Range]: {}", @@ -1074,7 +997,9 @@ struct StorageServer : public IStorageMetricsService { } std::string shardId; + UID dataMoveId; KeyRange range; + Reference shardInfo; }; std::map> pendingCheckpoints; // Pending checkpoint requests @@ -1089,6 +1014,8 @@ struct StorageServer : public IStorageMetricsService { bool shardAware; // True if the storage server is aware of the physical shards. + LocalityData locality; + // Histograms struct FetchKeysHistograms { const Reference latency; @@ -1318,7 +1245,6 @@ struct StorageServer : public IStorageMetricsService { KeyRangeMap> shards; KeyRangeMap ssBulkLoadMetadataMap; // store the latest bulkload task on ranges - std::unordered_map physicalShards; uint64_t shardChangeCounter; // max( shards->changecounter ) KeyRangeMap cachedRangeMap; // indicates if a key-range is being cached @@ -1711,9 +1637,10 @@ struct StorageServer : public IStorageMetricsService { Reference const> const& db, StorageServerInterface const& ssi, Reference encryptionMonitor) - : shardAware(false), tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, - TLOG_CURSOR_READS_LATENCY_HISTOGRAM, - Histogram::Unit::milliseconds)), + : shardAware(false), locality(ssi.locality), + tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + TLOG_CURSOR_READS_LATENCY_HISTOGRAM, + Histogram::Unit::milliseconds)), ssVersionLockLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, SS_VERSION_LOCK_LATENCY_HISTOGRAM, Histogram::Unit::milliseconds)), @@ -1802,25 +1729,6 @@ struct StorageServer : public IStorageMetricsService { } //~StorageServer() { fclose(log); } - void addRangeToPhysicalShard(Reference newRange) { - if (!shardAware || newRange->notAssigned()) { - return; - } - - auto [it, ignored] = - physicalShards.insert(std::make_pair(newRange->desiredShardId, SSPhysicalShard(newRange->desiredShardId))); - it->second.addRange(newRange); - } - - void removeRangeFromPhysicalShard(Reference range) { - if (!range.isValid() || !shardAware || range->notAssigned()) { - return; - } - - auto it = physicalShards.find(range->desiredShardId); - ASSERT(it != physicalShards.end()); - it->second.removeRange(range); - } // Puts the given shard into shards. The caller is responsible for adding shards // for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these @@ -1843,14 +1751,12 @@ struct StorageServer : public IStorageMetricsService { .detail("ShardID", format("%016llx", it->value()->desiredShardId)) .detail("NewShardID", format("%016llx", newShard->desiredShardId)) .detail("NewShardActualID", format("%016llx", newShard->shardId)); - removeRangeFromPhysicalShard(it->value()); } } } Reference rShard(newShard); shards.insert(newShard->keys, rShard); - addRangeToPhysicalShard(rShard); } void addMutation(Version version, bool fromFetch, @@ -2210,11 +2116,6 @@ void validate(StorageServer* data, bool force = false) { ASSERT(!s->value()->keys.empty()); if (data->shardAware) { s->value()->validate(); - if (!s->value()->notAssigned()) { - auto it = data->physicalShards.find(s->value()->desiredShardId); - ASSERT(it != data->physicalShards.end()); - ASSERT(it->second.hasRange(s->value())); - } } } @@ -7701,33 +7602,6 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned); void updateStorageShard(StorageServer* self, StorageServerShard shard); void setRangeBasedBulkLoadStatus(StorageServer* self, KeyRangeRef keys, const SSBulkLoadMetadata& ssBulkLoadMetadata); -void coalescePhysicalShards(StorageServer* data, KeyRangeRef keys) { - auto shardRanges = data->shards.intersectingRanges(keys); - auto fullRange = data->shards.ranges(); - - auto iter = shardRanges.begin(); - if (iter != fullRange.begin()) { - --iter; - } - auto iterEnd = shardRanges.end(); - if (iterEnd != fullRange.end()) { - ++iterEnd; - } - - KeyRangeMap>::iterator lastShard = iter; - ++iter; - - for (; iter != iterEnd; ++iter) { - if (ShardInfo::canMerge(lastShard.value().getPtr(), iter->value().getPtr())) { - ShardInfo* newShard = lastShard.value().extractPtr(); - ASSERT(newShard->mergeWith(iter->value().getPtr())); - data->addShard(newShard); - iter = data->shards.rangeContaining(newShard->keys.begin); - } - lastShard = iter; - } -} - void coalesceShards(StorageServer* data, KeyRangeRef keys) { auto shardRanges = data->shards.intersectingRanges(keys); auto fullRange = data->shards.ranges(); @@ -9030,7 +8904,8 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) .detail("Phase", "Begin") - .detail("ConcurrentTasks", data->bulkLoadMetrics->getOngoingTasks()); + .detail("ConcurrentTasks", data->bulkLoadMetrics->getOngoingTasks()) + .detail("FKID", fetchKeysID); data->bulkLoadMetrics->addTask(); } @@ -9096,6 +8971,16 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("DataMoveId", dataMoveId) .detail("ConductBulkLoad", conductBulkLoad); + if (SERVER_KNOBS->SS_GET_DATA_MOVE_ID && data->shardAware) { + // Empty team id indicates the data move will be cancelled soon. + if (shard->teamId == "" || shard->teamId == "InvalidTeam") { + TraceEvent(SevWarnAlways, "TeamIdUnavailable") + .detail("FKID", fetchKeysID) + .detail("DurableVersion", data->durableVersion.get()) + .detail("Version", data->version.get()); + wait(Never()); + } + } wait(data->fetchKeysParallelismLock.take(TaskPriority::DefaultYield)); state FlowLock::Releaser holdingFKPL(data->fetchKeysParallelismLock); @@ -9181,7 +9066,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // Note that error in getting GRV doesn't affect any storage server state. Therefore, we catch // all errors here without failing the storage server. When error happens, fetchVersion fall // back to the above computed fetchVersion. - TraceEvent(SevWarn, "FetchKeyGRVError", data->thisServerID).error(e); + TraceEvent(SevWarn, "FetchKeyGRVError", data->thisServerID).error(e).detail("FKID", fetchKeysID); lastError = e; } } @@ -9230,7 +9115,8 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) - .detail("Phase", "Got task metadata"); + .detail("Phase", "Got task metadata") + .detail("FKID", fetchKeysID); // Check the correctness: bulkLoadTaskMetadata stored in dataMoveMetadata must have the same // dataMoveId. ASSERT(bulkLoadTaskState.getDataMoveId() == dataMoveId); @@ -9243,14 +9129,16 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("Range", keys) .detail("Knobs", SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST) .detail("SupportsSstIngestion", data->storage.getKeyValueStore()->supportsSstIngestion()) - .detail("Phase", "File download"); + .detail("Phase", "File download") + .detail("FKID", fetchKeysID); // Attempt SST ingestion... if (SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST && data->storage.getKeyValueStore()->supportsSstIngestion()) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) - .detail("Phase", "SST ingestion"); + .detail("Phase", "SST ingestion") + .detail("FKID", fetchKeysID); // Verify ranges... for (const auto& [range, fileSet] : *localBulkLoadFileSets) { ASSERT(keys.contains(range)); @@ -9337,13 +9225,14 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { if (shard->reason != DataMovementReason::INVALID && priority < SERVER_KNOBS->FETCH_KEYS_THROTTLE_PRIORITY_THRESHOLD && !data->fetchKeysLimiter.ready().isReady()) { - TraceEvent(SevDebug, "FetchKeysThrottling", data->thisServerID); + TraceEvent(SevDebug, "FetchKeysThrottling", data->thisServerID).detail("FKID", fetchKeysID); state double ts = now(); wait(data->fetchKeysLimiter.ready()); TraceEvent(SevDebug, "FetchKeysThrottled", data->thisServerID) .detail("Priority", priority) .detail("KeyRange", shard->keys) - .detail("Delay", now() - ts); + .detail("Delay", now() - ts) + .detail("FKID", fetchKeysID); } // Write this_block to storage @@ -9454,6 +9343,12 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { splitMutations(data, data->shards, *u); } + TraceEvent(SevDebug, "FetchKeysSplit") + .detail("ExpectedRange", keys) + .detail("NewBlockBegin", blockBegin) + .detail("DataMoveId", dataMoveId) + .detail("FKID", fetchKeysID); + CODE_PROBE(true, "fetchkeys has more"); CODE_PROBE(shard->updates.size(), "Shard has updates"); ASSERT(otherShard->updates.empty()); @@ -9482,7 +9377,8 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(SevInfo, "FetchKeysStats", data->thisServerID) .detail("TotalBytes", totalBytes) .detail("Duration", duration) - .detail("Rate", static_cast(totalBytes) / duration); + .detail("Rate", static_cast(totalBytes) / duration) + .detail("FKID", fetchKeysID); TraceEvent(SevDebug, "FKBeforeFinalCommit", data->thisServerID) .detail("FKID", interval.pairID) @@ -9657,7 +9553,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { shard->readWrite.send(Void()); if (data->shardAware) { data->addShard(ShardInfo::newShard(data, newShard)); // invalidates shard! - coalescePhysicalShards(data, keys); } else { data->addShard(ShardInfo::newReadWrite(shard->keys, data)); // invalidates shard! coalesceShards(data, keys); @@ -9668,7 +9563,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { ++data->counters.fetchExecutingCount; data->counters.fetchExecutingMS += 1000 * (now() - executeStart); - TraceEvent(SevDebug, interval.end(), data->thisServerID); + TraceEvent(SevDebug, interval.end(), data->thisServerID).detail("FKID", fetchKeysID); if (conductBulkLoad) { data->bulkLoadMetrics->removeTask(); // Do best effort cleanup @@ -9677,6 +9572,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { } catch (Error& e) { TraceEvent(SevDebug, interval.end(), data->thisServerID) + .detail("FKID", fetchKeysID) .errorUnsuppressed(e) .detail("Version", data->version.get()); if (!data->shuttingDown) { @@ -9697,13 +9593,15 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { data->newestDirtyVersion.insert(keys, data->data().getLatestVersion()); } } + TraceEvent(SevError, "FetchKeysError", data->thisServerID) .error(e) .detail("Elapsed", now() - startTime) .detail("KeyBegin", keys.begin) .detail("KeyEnd", keys.end) .detail("FetchVersion", fetchVersion) - .detail("KnownCommittedVersion", data->knownCommittedVersion.get()); + .detail("KnownCommittedVersion", data->knownCommittedVersion.get()) + .detail("FKID", fetchKeysID); if (e.code() != error_code_actor_cancelled) data->otherError.sendError(e); // Kill the storage server. Are there any recoverable errors? if (conductBulkLoad) { @@ -10237,7 +10135,6 @@ ACTOR Future fetchShardApplyUpdates(StorageServer* data, .detail("MoveInShard", moveInShard->toString()); data->addShard(ShardInfo::newShard(data, newShard)); data->newestAvailableVersion.insert(range, latestVersion); - coalescePhysicalShards(data, range); } validate(data); moveInShard->readWrite.send(Void()); @@ -10803,7 +10700,6 @@ ACTOR Future restoreShards(StorageServer* data, wait(yield()); } - coalescePhysicalShards(data, allKeys); validate(data, /*force=*/true); TraceEvent(SevInfo, "StorageServerRestoreShardsEnd", data->thisServerID).detail("Version", version); @@ -11100,13 +10996,11 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, // When TSS is lagging behind, it could see data move conflicts. The conflicting TSS will not recover from error and // needs to be removed. Severity sev = data->isTss() ? SevWarnAlways : SevError; + + // Re-align shard boundaries and validate CSK scenarios. for (int i = 0; i < ranges.size(); i++) { const Reference currentShard = ranges[i].value; const KeyRangeRef currentRange = static_cast(ranges[i]); - if (currentShard.isValid()) { - TraceEvent(sevDm, "OverlappingPhysicalShard", data->thisServerID) - .detail("PhysicalShard", currentShard->toStorageServerShard().toString()); - } if (!currentShard.isValid()) { if (currentRange != keys) { TraceEvent(sev, "PhysicalShardStateError") @@ -11119,7 +11013,13 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("InitialVersion", currentShard->version); throw dataMoveConflictError(data->isTss()); } - } else if (currentShard->notAssigned()) { + continue; + } + + TraceEvent(sevDm, "OverlappingPhysicalShard", data->thisServerID) + .detail("PhysicalShard", currentShard->toStorageServerShard().toString()); + + if (currentShard->notAssigned()) { if (!nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UnassignEmptyRange") @@ -11140,7 +11040,10 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else if (currentShard->isReadable()) { + continue; + } + + if (currentShard->isReadable()) { StorageServerShard newShard = currentShard->toStorageServerShard(); newShard.range = currentRange; data->addShard(ShardInfo::newShard(data, newShard)); @@ -11148,9 +11051,12 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("Range", keys) .detail("NowAssigned", nowAssigned) .detail("Version", cVer) - .detail("ResultingShard", newShard.toString()) - .detail("ShardIdDifferent", newShard.id != newShard.desiredId); - } else if (currentShard->adding) { + .detail("ResultingShard", newShard.toString()); + continue; + } + + // Shard is being moved. + if (currentShard->adding) { if (nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UpdateAddingShard") @@ -11163,6 +11069,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("InitialVersion", currentShard->version); throw dataMoveConflictError(data->isTss()); } + + // FetchKeys will be cancelled. StorageServerShard newShard = currentShard->toStorageServerShard(); newShard.range = currentRange; data->addShard(ShardInfo::newShard(data, newShard)); @@ -11171,7 +11079,10 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else if (currentShard->moveInShard) { + continue; + } + + if (currentShard->moveInShard) { if (nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UpdateMoveInShard") @@ -11184,6 +11095,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("InitialVersion", currentShard->version); throw dataMoveConflictError(data->isTss()); } + + // FetchShard will be cancelled. currentShard->moveInShard->cancel(); updatedMoveInShards.emplace(currentShard->moveInShard->id(), currentShard->moveInShard); StorageServerShard newShard = currentShard->toStorageServerShard(); @@ -11194,11 +11107,13 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else { - ASSERT(false); + continue; } + + ASSERT(false); // unreachable. } + // Update shards in shard map. auto vr = data->shards.intersectingRanges(keys); std::vector> changeNewestAvailable; std::vector removeRanges; @@ -11215,17 +11130,23 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("ShardState", r->value()->debugDescribeState()); ASSERT(keys.contains(r->range())); + + // Assign empty range on data loss. if (context == CSK_ASSIGN_EMPTY && !dataAvailable) { ASSERT(nowAssigned); TraceEvent(sevDm, "ChangeServerKeysAddEmptyRange", data->thisServerID) .detail("Range", range) - .detail("Version", cVer); + .detail("Version", cVer) + .detail("DataMoveId", dataMoveId); newEmptyRanges.push_back(range); + // auto shardInfo = ShardInfo(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); updatedShards.emplace_back(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); - if (data->physicalShards.find(desiredId) == data->physicalShards.end()) { - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - } - } else if (!nowAssigned) { + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + continue; + } + + // Unassign shard and remove data range if exists. + if (!nowAssigned) { if (dataAvailable) { ASSERT(data->newestAvailableVersion[range.begin] == latestVersion); // Not that we care, but this used to be checked instead of dataAvailable @@ -11247,82 +11168,11 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("NewShard", updatedShards.back().toString()); - } else if (!dataAvailable) { - if (version == data->initialClusterVersion - 1) { - TraceEvent(SevInfo, "CSKWithPhysicalShardsSeedRange", data->thisServerID) - .detail("ShardID", desiredId) - .detail("Range", range) - .detail("Version", cVer); - changeNewestAvailable.emplace_back(range, latestVersion); - updatedShards.push_back( - StorageServerShard(range, version, desiredId, desiredId, StorageServerShard::ReadWrite)); - setAvailableStatus(data, range, true); - // Note: The initial range is available, however, the shard won't be created in the storage engine - // until version is committed. - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - TraceEvent(sevDm, "SSInitialShard", data->thisServerID) - .detail("Range", range) - .detail("NowAssigned", nowAssigned) - .detail("Version", cVer) - .detail("NewShard", updatedShards.back().toString()); - } else { - auto& shard = data->shards[range.begin]; - if (!shard->assigned()) { - if (enablePSM) { - std::shared_ptr moveInShard = - data->getMoveInShard(dataMoveId, cVer, conductBulkLoad); - moveInShard->addRange(range); - updatedMoveInShards.emplace(moveInShard->id(), moveInShard); - updatedShards.push_back(StorageServerShard( - range, cVer, desiredId, desiredId, StorageServerShard::MovingIn, moveInShard->id())); - } else { - updatedShards.push_back( - StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - } - data->newestDirtyVersion.insert(range, cVer); - TraceEvent(sevDm, "SSAssignShard", data->thisServerID) - .detail("Range", range) - .detail("NowAssigned", nowAssigned) - .detail("Version", cVer) - .detail("TotalAssignedAtVer", ++totalAssignedAtVer) - .detail("ConductBulkLoad", conductBulkLoad) - .detail("NewShard", updatedShards.back().toString()); - } else { - ASSERT(shard->adding != nullptr || shard->moveInShard != nullptr); - if (shard->desiredShardId != desiredId) { - TraceEvent(SevWarnAlways, "CSKConflictingMoveInShards", data->thisServerID) - .detail("DataMoveID", dataMoveId) - .detail("Range", range) - .detailf("TargetShard", "%016llx", desiredId) - .detailf("CurrentShard", "%016llx", shard->desiredShardId) - .detail("IsTSS", data->isTss()) - .detail("Version", cVer); - throw dataMoveConflictError(data->isTss()); - } else { - TraceEvent(SevInfo, "CSKMoveInToSameShard", data->thisServerID) - .detail("DataMoveID", dataMoveId) - .detailf("TargetShard", "%016llx", desiredId) - .detail("MoveRange", keys) - .detail("Range", range) - .detail("ExistingShardRange", shard->keys) - .detail("ShardDebugString", shard->debugDescribeState()) - .detail("Version", cVer); - if (context == CSK_FALL_BACK) { - updatedShards.push_back( - StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - // Physical shard move fall back happens if and only if the data move is failed to get the - // checkpoint. However, this case never happens the bulkload. So, the bulkload does not - // support fall back. - ASSERT(!conductBulkLoad); // TODO(BulkLoad): remove this assert - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - data->newestDirtyVersion.insert(range, cVer); - // TODO: removeDataRange if the moveInShard has written to the kvs. - } - } - } - } - } else { + continue; + } + + // Shard already available in SS. Update desired shard id. + if (dataAvailable) { updatedShards.push_back(StorageServerShard( range, cVer, data->shards[range.begin]->shardId, desiredId, StorageServerShard::ReadWrite)); changeNewestAvailable.emplace_back(range, latestVersion); @@ -11331,6 +11181,85 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("NewShard", updatedShards.back().toString()); + continue; + } + + ASSERT(!dataAvailable); + + // Assign a shard to storage server. Skip fetchKeys if the cluster is in initial state. + if (version == data->initialClusterVersion - 1) { + TraceEvent(sevDm, "CSKWithPhysicalShardsSeedRange", data->thisServerID) + .detail("ShardID", desiredId) + .detail("Range", range); + changeNewestAvailable.emplace_back(range, latestVersion); + updatedShards.push_back( + StorageServerShard(range, version, desiredId, desiredId, StorageServerShard::ReadWrite)); + setAvailableStatus(data, range, true); + // Note: The initial range is available, however, the shard won't be created in the storage engine + // until version is committed. + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + TraceEvent(sevDm, "SSInitialShard", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("NewShard", updatedShards.back().toString()); + + continue; + } + + auto& shard = data->shards[range.begin]; + if (!shard->assigned()) { + if (enablePSM) { + std::shared_ptr moveInShard = data->getMoveInShard(dataMoveId, cVer, conductBulkLoad); + moveInShard->addRange(range); + updatedMoveInShards.emplace(moveInShard->id(), moveInShard); + updatedShards.push_back(StorageServerShard( + range, cVer, desiredId, desiredId, StorageServerShard::MovingIn, moveInShard->id())); + } else { + updatedShards.push_back( + StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + } + data->newestDirtyVersion.insert(range, cVer); + TraceEvent(sevDm, "SSAssignShard", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("TotalAssignedAtVer", ++totalAssignedAtVer) + .detail("ConductBulkLoad", conductBulkLoad) + .detail("NewShard", updatedShards.back().toString()); + } else { + // Shard is being moved. + ASSERT(shard->adding != nullptr || shard->moveInShard != nullptr); + if (shard->desiredShardId != desiredId) { + TraceEvent(SevWarnAlways, "CSKConflictingMoveInShards", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detail("Range", range) + .detailf("TargetShard", "%016llx", desiredId) + .detailf("CurrentShard", "%016llx", shard->desiredShardId) + .detail("IsTSS", data->isTss()) + .detail("Version", cVer); + throw dataMoveConflictError(data->isTss()); + } + TraceEvent(SevInfo, "CSKMoveInToSameShard", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detailf("TargetShard", "%016llx", desiredId) + .detail("MoveRange", keys) + .detail("Range", range) + .detail("ExistingShardRange", shard->keys) + .detail("ShardDebugString", shard->debugDescribeState()) + .detail("Version", cVer); + if (context == CSK_FALL_BACK) { + updatedShards.push_back( + StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); + // Physical shard move fall back happens if and only if the data move is failed to get the + // checkpoint. However, this case never happens the bulkload. So, the bulkload does not + // support fall back. + ASSERT(!conductBulkLoad); // TODO(BulkLoad): remove this assert + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + data->newestDirtyVersion.insert(range, cVer); + // TODO: removeDataRange if the moveInShard has written to the kvs. + } } } @@ -11338,7 +11267,21 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, data->addShard(ShardInfo::newShard(data, shard)); updateStorageShard(data, shard); } + + // Link shard info to pending new ranges. + // TODO: consider refactoring to avoid extra shard look up. + if (data->pendingAddRanges.find(cVer) != data->pendingAddRanges.end()) { + for (auto& shard : data->pendingAddRanges[cVer]) { + auto it = data->shards.rangeContaining(shard.range.begin); + ASSERT(it->value()); + ASSERT(it.range().end == shard.range.end); + shard.shardInfo = it.value(); + } + } + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + + // Persist physcial shard move metadata. for (const auto& [id, shard] : updatedMoveInShards) { data->addMutationToMutationLog( mLV, MutationRef(MutationRef::SetValue, persistMoveInShardKey(id), moveInShardValue(*shard->meta))); @@ -11354,14 +11297,12 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, data->metrics.notifyNotReadable(keys); } - coalescePhysicalShards(data, KeyRangeRef(ranges[0].begin, ranges[ranges.size() - 1].end)); - // Now it is OK to do removeDataRanges, directly and through fetchKeys cancellation (and we have to do so before // validate()) oldShards.clear(); ranges.clear(); for (auto r = removeRanges.begin(); r != removeRanges.end(); ++r) { - removeDataRange(data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r); + removeDataRange(data, mLV, data->shards, *r); setAvailableStatus(data, *r, false); } @@ -11545,7 +11486,8 @@ class StorageUpdater { .detail("Version", ver) .detail("EnablePSM", enablePSM) .detail("DataMoveId", dataMoveId.toString()) - .detail("ConductBulkLoad", conductBulkLoad); + .detail("ConductBulkLoad", conductBulkLoad) + .detail("Context", changeServerKeysContextName(context)); if (data->shardAware) { setAssignedStatus(data, keys, nowAssigned); changeServerKeysWithPhysicalShards( @@ -12839,6 +12781,51 @@ ACTOR Future createCheckpoint(StorageServer* data, CheckpointMetaData meta return Void(); } +ACTOR Future getDataMoveMetadata(Version version, + std::vector shards, + StorageServer* self) { + state Transaction tr(self->cx); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + + loop { + try { + tr.reset(); + state std::vector>> fDataMoves; + for (auto& shard : shards) { + fDataMoves.push_back(tr.get(dataMoveKeyFor(shard.dataMoveId))); + } + wait(waitForAll(fDataMoves)); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + for (int i = 0; i < shards.size(); ++i) { + if (fDataMoves[i].get().present()) { + auto& shard = shards[i]; + DataMoveMetaData dataMove = decodeDataMoveValue(fDataMoves[i].get().get()); + + if (!dataMove.ranges.empty()) { + if (!dataMove.dcTeamIds.present()) { + TraceEvent("TeamIdNotSet").detail("DataMoveId", shard.dataMoveId).detail("Range", shard.range); + continue; + } + auto teamId = dataMove.dcTeamIds.get()[self->locality.describeDcId()]; + ASSERT(dataMove.ranges.front().contains(shard.range)); + shard.shardInfo->teamId = teamId; + auto& addingShard = shard.shardInfo->adding; + ASSERT(addingShard); + addingShard->teamId = teamId; + TraceEvent(SevDebug, "GotValidTeamId").detail("Range", shard.range).detail("TeamId", teamId); + } + + shard.shardInfo.clear(); + } + } + return Void(); +} + struct UpdateStorageCommitStats { double beforeStorageUpdates; double beforeStorageCommit; @@ -12982,6 +12969,10 @@ ACTOR Future updateStorage(StorageServer* data) { fAddRanges.push_back(data->storage.addRange(shard.range, shard.shardId)); } wait(waitForAll(fAddRanges)); + if (SERVER_KNOBS->SS_GET_DATA_MOVE_ID) { + wait(getDataMoveMetadata( + data->pendingAddRanges.begin()->first, data->pendingAddRanges.begin()->second, data)); + } TraceEvent(SevVerbose, "SSAddKVSRangeEnd", data->thisServerID) .detail("Version", data->pendingAddRanges.begin()->first) .detail("DurableVersion", data->durableVersion.get()); @@ -14718,6 +14709,7 @@ ACTOR Future storageEngineConsistencyCheck(StorageServer* self) { } CoalescedKeyRangeMap currentShards; + std::unordered_map teamShardCount; currentShards.insert(allKeys, ""); auto fullRange = self->shards.ranges(); for (auto it = fullRange.begin(); it != fullRange.end(); ++it) { @@ -14726,9 +14718,14 @@ ACTOR Future storageEngineConsistencyCheck(StorageServer* self) { } if (it.value()->assigned()) { currentShards.insert(it.range(), format("%016llx", it.value()->shardId)); + teamShardCount[it.value()->teamId]++; } } + for (const auto& kv : teamShardCount) { + TraceEvent("StorageServerTeamShardCount").detail("TeamId", kv.first).detail("ShardCount", kv.second); + } + auto kvShards = self->storage.getExistingRanges(); TraceEvent(SevInfo, "StorageEngineConsistencyCheckStarted").log(); From b2bda3cad9a353903651ca260ead111e47f4caba Mon Sep 17 00:00:00 2001 From: Yao Xiao Date: Tue, 8 Jul 2025 12:54:41 -0700 Subject: [PATCH 2/3] revert changes --- fdbclient/ServerKnobs.cpp | 3 - fdbclient/include/fdbclient/ServerKnobs.h | 3 - .../include/fdbclient/StorageCheckpoint.h | 4 +- .../include/fdbclient/StorageServerShard.h | 1 - fdbserver/MoveKeys.actor.cpp | 33 +------ fdbserver/storageserver.actor.cpp | 89 ++----------------- 6 files changed, 10 insertions(+), 123 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 23e9668c202..8e3ee343374 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1119,9 +1119,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( FETCH_SHARD_BUFFER_BYTE_LIMIT, 20e6 ); if( randomize && BUGGIFY ) FETCH_SHARD_BUFFER_BYTE_LIMIT = 1; init( FETCH_SHARD_UPDATES_BYTE_LIMIT, 2500000 ); if( randomize && BUGGIFY ) FETCH_SHARD_UPDATES_BYTE_LIMIT = 100; - // Storage Server with Physical Shard - init( SS_GET_DATA_MOVE_ID, false); if ( isSimulated ) SS_GET_DATA_MOVE_ID = SHARD_ENCODE_LOCATION_METADATA; - //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; init( WAIT_FAILURE_DELAY_LIMIT, 1.0 ); if( randomize && BUGGIFY ) WAIT_FAILURE_DELAY_LIMIT = 5.0; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 3b6f1ebcc35..b51ae4f407b 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1158,9 +1158,6 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl bulkLoadTaskState; // set if the data move is a bulk load data move - Optional> dcTeamIds; // map of dcId to teamId DataMoveMetaData() = default; DataMoveMetaData(UID id, Version version, KeyRange range) : id(id), version(version), priority(0), mode(0) { @@ -207,8 +206,7 @@ struct DataMoveMetaData { template void serialize(Ar& ar) { - serializer( - ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState, dcTeamIds); + serializer(ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState); } }; diff --git a/fdbclient/include/fdbclient/StorageServerShard.h b/fdbclient/include/fdbclient/StorageServerShard.h index 2c0224adcae..cbaf5bb766e 100644 --- a/fdbclient/include/fdbclient/StorageServerShard.h +++ b/fdbclient/include/fdbclient/StorageServerShard.h @@ -104,7 +104,6 @@ struct StorageServerShard { uint64_t desiredId; // The intended shard ID. int8_t shardState; Optional moveInShardId; // If present, it is the associated MoveInShardMetaData. - UID teamId; }; #endif diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 4f51d19f205..7bb3462f0db 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1712,7 +1712,7 @@ ACTOR static Future startMoveShards(Database occ, serverListEntries.push_back(tr.get(serverListKeyFor(servers[s]))); } std::vector> serverListValues = wait(getAll(serverListEntries)); - state std::unordered_map> dcServers; + for (int s = 0; s < serverListValues.size(); s++) { if (!serverListValues[s].present()) { // Attempt to move onto a server that isn't in serverList (removed or never added to the @@ -1721,13 +1721,6 @@ ACTOR static Future startMoveShards(Database occ, // TODO(psm): Mark the data move as 'deleting'. throw move_to_removed_server(); } - auto si = decodeServerListValue(serverListValues[s].get()); - ASSERT(si.id() == servers[s]); - auto it = dcServers.find(si.locality.describeDcId()); - if (it == dcServers.end()) { - dcServers[si.locality.describeDcId()] = std::vector(); - } - dcServers[si.locality.describeDcId()].push_back(si.id().shortString()); } currentKeys = KeyRangeRef(begin, keys.end); @@ -1740,15 +1733,6 @@ ACTOR static Future startMoveShards(Database occ, state Key endKey = old.back().key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); - if (ranges.front() != currentKeys) { - TraceEvent("MoveShardsPartialRange") - .detail("ExpectedRange", ranges.front()) - .detail("ActualRange", currentKeys) - .detail("DataMoveId", dataMoveId) - .detail("RowLimit", SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT) - .detail("ByteLimit", SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); - } - // Check that enough servers for each shard are in the correct state state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); @@ -1822,7 +1806,6 @@ ACTOR static Future startMoveShards(Database occ, TraceEvent( SevWarn, "StartMoveShardsCancelConflictingDataMove", relocationIntervalId) .detail("Range", rangeIntersectKeys) - .detail("CurrentDataMoveRange", ranges[0]) .detail("DataMoveID", dataMoveId.toString()) .detail("ExistingDataMoveID", destId.toString()); wait(cleanUpDataMove(occ, destId, lock, startMoveKeysLock, keys, ddEnabledState)); @@ -1885,20 +1868,6 @@ ACTOR static Future startMoveShards(Database occ, dataMove.ranges.clear(); dataMove.ranges.push_back(KeyRangeRef(keys.begin, currentKeys.end)); dataMove.dest.insert(servers.begin(), servers.end()); - dataMove.dcTeamIds = std::unordered_map(); - for (auto& [dc, serverIds] : dcServers) { - std::sort(serverIds.begin(), serverIds.end()); - std::string teamId; - for (const auto& serverId : serverIds) { - if (teamId.size() == 0) { - teamId = serverId; - } else { - teamId += "," + serverId; - } - } - // Use the concatenated server ids as the team id to avoid conflicts. - dataMove.dcTeamIds.get()[dc] = teamId; - } } if (currentKeys.end == keys.end) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 3b5d3bf4d25..522d7aacfd0 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -222,6 +222,7 @@ static const std::string checkpointBytesSampleTempFolder = "/metadata_temp"; static const std::string fetchedCheckpointFolder = "fetchedCheckpoints"; static const std::string serverBulkDumpFolder = "bulkDumpFiles"; static const std::string serverBulkLoadFolder = "bulkLoadFiles"; +static const std::string invalidTeamId = "InvalidTeam"; static const KeyRangeRef persistBulkLoadTaskKeys = KeyRangeRef(PERSIST_PREFIX "BulkLoadTask/"_sr, PERSIST_PREFIX "BulkLoadTask0"_sr); @@ -474,8 +475,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { uint64_t changeCounter; uint64_t shardId; uint64_t desiredShardId; - UID dataMoveId; - std::string teamId = "InvalidTeam"; + std::string teamId = invalidTeamId; Version version; static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, nullptr, nullptr); } @@ -987,8 +987,7 @@ struct StorageServer : public IStorageMetricsService { public: struct PendingNewShard { - PendingNewShard(uint64_t shardId, UID dataMoveId, KeyRangeRef range, Reference shardInfo) - : shardId(format("%016llx", shardId)), dataMoveId(dataMoveId), range(range), shardInfo(shardInfo) {} + PendingNewShard(uint64_t shardId, KeyRangeRef range) : shardId(format("%016llx", shardId)), range(range) {} std::string toString() const { return fmt::format("PendingNewShard: [ShardID]: {} [Range]: {}", @@ -997,9 +996,7 @@ struct StorageServer : public IStorageMetricsService { } std::string shardId; - UID dataMoveId; KeyRange range; - Reference shardInfo; }; std::map> pendingCheckpoints; // Pending checkpoint requests @@ -1014,7 +1011,7 @@ struct StorageServer : public IStorageMetricsService { bool shardAware; // True if the storage server is aware of the physical shards. - LocalityData locality; + LocalityData locality; // Storage server's locality information. // Histograms struct FetchKeysHistograms { @@ -8971,16 +8968,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("DataMoveId", dataMoveId) .detail("ConductBulkLoad", conductBulkLoad); - if (SERVER_KNOBS->SS_GET_DATA_MOVE_ID && data->shardAware) { - // Empty team id indicates the data move will be cancelled soon. - if (shard->teamId == "" || shard->teamId == "InvalidTeam") { - TraceEvent(SevWarnAlways, "TeamIdUnavailable") - .detail("FKID", fetchKeysID) - .detail("DurableVersion", data->durableVersion.get()) - .detail("Version", data->version.get()); - wait(Never()); - } - } wait(data->fetchKeysParallelismLock.take(TaskPriority::DefaultYield)); state FlowLock::Releaser holdingFKPL(data->fetchKeysParallelismLock); @@ -11141,7 +11128,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, newEmptyRanges.push_back(range); // auto shardInfo = ShardInfo(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); updatedShards.emplace_back(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); - data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + data->pendingAddRanges[cVer].emplace_back(desiredId, range); continue; } @@ -11197,7 +11184,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, setAvailableStatus(data, range, true); // Note: The initial range is available, however, the shard won't be created in the storage engine // until version is committed. - data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + data->pendingAddRanges[cVer].emplace_back(desiredId, range); TraceEvent(sevDm, "SSInitialShard", data->thisServerID) .detail("Range", range) .detail("NowAssigned", nowAssigned) @@ -11218,7 +11205,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, } else { updatedShards.push_back( StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + data->pendingAddRanges[cVer].emplace_back(desiredId, range); } data->newestDirtyVersion.insert(range, cVer); TraceEvent(sevDm, "SSAssignShard", data->thisServerID) @@ -11256,7 +11243,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, // checkpoint. However, this case never happens the bulkload. So, the bulkload does not // support fall back. ASSERT(!conductBulkLoad); // TODO(BulkLoad): remove this assert - data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + data->pendingAddRanges[cVer].emplace_back(desiredId, range); data->newestDirtyVersion.insert(range, cVer); // TODO: removeDataRange if the moveInShard has written to the kvs. } @@ -11268,17 +11255,6 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, updateStorageShard(data, shard); } - // Link shard info to pending new ranges. - // TODO: consider refactoring to avoid extra shard look up. - if (data->pendingAddRanges.find(cVer) != data->pendingAddRanges.end()) { - for (auto& shard : data->pendingAddRanges[cVer]) { - auto it = data->shards.rangeContaining(shard.range.begin); - ASSERT(it->value()); - ASSERT(it.range().end == shard.range.end); - shard.shardInfo = it.value(); - } - } - auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); // Persist physcial shard move metadata. @@ -12781,51 +12757,6 @@ ACTOR Future createCheckpoint(StorageServer* data, CheckpointMetaData meta return Void(); } -ACTOR Future getDataMoveMetadata(Version version, - std::vector shards, - StorageServer* self) { - state Transaction tr(self->cx); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - - loop { - try { - tr.reset(); - state std::vector>> fDataMoves; - for (auto& shard : shards) { - fDataMoves.push_back(tr.get(dataMoveKeyFor(shard.dataMoveId))); - } - wait(waitForAll(fDataMoves)); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - for (int i = 0; i < shards.size(); ++i) { - if (fDataMoves[i].get().present()) { - auto& shard = shards[i]; - DataMoveMetaData dataMove = decodeDataMoveValue(fDataMoves[i].get().get()); - - if (!dataMove.ranges.empty()) { - if (!dataMove.dcTeamIds.present()) { - TraceEvent("TeamIdNotSet").detail("DataMoveId", shard.dataMoveId).detail("Range", shard.range); - continue; - } - auto teamId = dataMove.dcTeamIds.get()[self->locality.describeDcId()]; - ASSERT(dataMove.ranges.front().contains(shard.range)); - shard.shardInfo->teamId = teamId; - auto& addingShard = shard.shardInfo->adding; - ASSERT(addingShard); - addingShard->teamId = teamId; - TraceEvent(SevDebug, "GotValidTeamId").detail("Range", shard.range).detail("TeamId", teamId); - } - - shard.shardInfo.clear(); - } - } - return Void(); -} - struct UpdateStorageCommitStats { double beforeStorageUpdates; double beforeStorageCommit; @@ -12969,10 +12900,6 @@ ACTOR Future updateStorage(StorageServer* data) { fAddRanges.push_back(data->storage.addRange(shard.range, shard.shardId)); } wait(waitForAll(fAddRanges)); - if (SERVER_KNOBS->SS_GET_DATA_MOVE_ID) { - wait(getDataMoveMetadata( - data->pendingAddRanges.begin()->first, data->pendingAddRanges.begin()->second, data)); - } TraceEvent(SevVerbose, "SSAddKVSRangeEnd", data->thisServerID) .detail("Version", data->pendingAddRanges.begin()->first) .detail("DurableVersion", data->durableVersion.get()); From ef1fe4ea8da6f369083e1118ee3727844ddb79bb Mon Sep 17 00:00:00 2001 From: Yao Xiao Date: Wed, 9 Jul 2025 14:56:41 -0700 Subject: [PATCH 3/3] refactor --- fdbserver/storageserver.actor.cpp | 185 ++++++++++++++++-------------- 1 file changed, 101 insertions(+), 84 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 522d7aacfd0..09905cf89fc 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -459,6 +459,7 @@ struct AddingShard : NonCopyable { }; class ShardInfo : public ReferenceCounted, NonCopyable { +private: ShardInfo(KeyRange keys, std::unique_ptr&& adding, StorageServer* readWrite) : adding(std::move(adding)), readWrite(readWrite), keys(keys), shardId(0LL), desiredShardId(0LL), version(0) {} ShardInfo(KeyRange keys, std::shared_ptr moveInShard) @@ -466,7 +467,6 @@ class ShardInfo : public ReferenceCounted, NonCopyable { shardId(moveInShard->meta->destShardId()), desiredShardId(moveInShard->meta->destShardId()), version(moveInShard->meta->createVersion) {} -public: // A shard has 4 mutual exclusive states: adding, moveInShard, readWrite and notAssigned. std::unique_ptr adding; struct StorageServer* readWrite; @@ -478,6 +478,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { std::string teamId = invalidTeamId; Version version; +public: static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, nullptr, nullptr); } static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, nullptr, data); } static ShardInfo* newAdding(StorageServer* data, @@ -497,7 +498,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { r->version == invalidVersion) { return false; } - if (l->shardId != r->shardId || l->desiredShardId != r->desiredShardId) { + if (l->getShardId() != r->getShardId() || l->getDesiredShardId() != r->getDesiredShardId()) { return false; } return (l->isReadable() && r->isReadable()) || (!l->assigned() && !r->assigned()); @@ -510,12 +511,12 @@ class ShardInfo : public ReferenceCounted, NonCopyable { st = StorageServerShard::ReadWrite; } else if (!this->assigned()) { st = StorageServerShard::NotAssigned; - } else if (this->adding) { - st = this->adding->phase == AddingShard::Waiting ? StorageServerShard::ReadWritePending - : StorageServerShard::Adding; + } else if (this->getAddingShard()) { + st = this->getAddingShard()->phase == AddingShard::Waiting ? StorageServerShard::ReadWritePending + : StorageServerShard::Adding; } else { - ASSERT(this->moveInShard); - const MoveInPhase phase = this->moveInShard->getPhase(); + ASSERT(this->getMoveInShard()); + const MoveInPhase phase = this->getMoveInShard()->getPhase(); if (phase < MoveInPhase::ReadWritePending) { st = StorageServerShard::MovingIn; } else if (phase == MoveInPhase::ReadWritePending) { @@ -527,7 +528,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { } // Clear moveInShardId if the data move is complete. if (phase != MoveInPhase::ReadWritePending && phase != MoveInPhase::Complete) { - moveInShardId = this->moveInShard->id(); + moveInShardId = this->getMoveInShard()->id(); } } return StorageServerShard(this->keys, this->version, this->shardId, this->desiredShardId, st, moveInShardId); @@ -545,8 +546,8 @@ class ShardInfo : public ReferenceCounted, NonCopyable { if (!canMerge(this, other)) { return false; } - this->keys = KeyRangeRef(this->keys.begin, other->keys.end); - this->version = std::max(this->version, other->version); + this->keys = KeyRangeRef(this->keys.begin, other->range().end); + this->version = std::max(this->version, other->getVersion()); return true; } @@ -557,6 +558,20 @@ class ShardInfo : public ReferenceCounted, NonCopyable { bool isReadable() const { return readWrite != nullptr; } bool notAssigned() const { return !readWrite && !adding && !moveInShard; } bool assigned() const { return readWrite || adding || moveInShard; } + + KeyRange range() const { return keys; } + uint64_t getShardId() const { return shardId; } + uint64_t getDesiredShardId() const { return desiredShardId; } + uint64_t getChangeCounter() const { return changeCounter; } + AddingShard* getAddingShard() const { return adding.get(); } + std::shared_ptr getMoveInShard() const { return moveInShard; } + Version getVersion() const { return version; } + std::string getTeamId() const { return teamId; } + + void setChangeCounter(uint64_t shardChangeCounter) { changeCounter = shardChangeCounter; } + void setShardId(uint64_t id) { shardId = id; } + void setDesiredShardId(uint64_t id) { desiredShardId = id; } + bool isInVersionedData() const { return readWrite || (adding && adding->isDataTransferred()) || (moveInShard && moveInShard->isDataTransferred()); @@ -1731,29 +1746,29 @@ struct StorageServer : public IStorageMetricsService { // for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these // shards are invalidated by the call. void addShard(ShardInfo* newShard) { - ASSERT(!newShard->keys.empty()); - newShard->changeCounter = ++shardChangeCounter; + ASSERT(!newShard->range().empty()); + newShard->setChangeCounter(++shardChangeCounter); // TraceEvent("AddShard", this->thisServerID).detail("KeyBegin", newShard->keys.begin).detail("KeyEnd", newShard->keys.end).detail("State",newShard->isReadable() ? "Readable" : newShard->notAssigned() ? "NotAssigned" : "Adding").detail("Version", this->version.get()); /*auto affected = shards.getAffectedRangesAfterInsertion( newShard->keys, Reference() ); for(auto i = affected.begin(); i != affected.end(); ++i) shards.insert( *i, Reference() );*/ if (shardAware && newShard->notAssigned()) { - auto sh = shards.intersectingRanges(newShard->keys); + auto sh = shards.intersectingRanges(newShard->range()); for (auto it = sh.begin(); it != sh.end(); ++it) { if (it->value().isValid() && !it->value()->notAssigned()) { TraceEvent(SevVerbose, "StorageServerAddShardClear") - .detail("NewShardRange", newShard->keys) - .detail("Range", it->value()->keys) - .detail("ShardID", format("%016llx", it->value()->desiredShardId)) - .detail("NewShardID", format("%016llx", newShard->desiredShardId)) - .detail("NewShardActualID", format("%016llx", newShard->shardId)); + .detail("NewShardRange", newShard->range()) + .detail("Range", it->value()->range()) + .detail("ShardID", format("%016llx", it->value()->getShardId())) + .detail("NewShardID", format("%016llx", newShard->getDesiredShardId())) + .detail("NewShardActualID", format("%016llx", newShard->getShardId())); } } } Reference rShard(newShard); - shards.insert(newShard->keys, rShard); + shards.insert(newShard->range(), rShard); } void addMutation(Version version, bool fromFetch, @@ -1794,7 +1809,7 @@ struct StorageServer : public IStorageMetricsService { } void checkChangeCounter(uint64_t oldShardChangeCounter, KeyRef const& key) { - if (oldShardChangeCounter != shardChangeCounter && shards[key]->changeCounter > oldShardChangeCounter) { + if (oldShardChangeCounter != shardChangeCounter && shards[key]->getChangeCounter() > oldShardChangeCounter) { CODE_PROBE(true, "shard change during getValueQ"); throw wrong_shard_server(); } @@ -1804,7 +1819,7 @@ struct StorageServer : public IStorageMetricsService { if (oldShardChangeCounter != shardChangeCounter) { auto sh = shards.intersectingRanges(keys); for (auto i = sh.begin(); i != sh.end(); ++i) - if (i->value()->changeCounter > oldShardChangeCounter) { + if (i->value()->getChangeCounter() > oldShardChangeCounter) { CODE_PROBE(true, "shard change during range operation"); throw wrong_shard_server(); } @@ -2104,13 +2119,13 @@ void validate(StorageServer* data, bool force = false) { for (auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) { TraceEvent(SevVerbose, "ValidateShard", data->thisServerID) .detail("Range", s->range()) - .detail("ShardID", format("%016llx", s->value()->shardId)) - .detail("DesiredShardID", format("%016llx", s->value()->desiredShardId)) - .detail("ShardRange", s->value()->keys) + .detail("ShardID", format("%016llx", s->value()->getShardId())) + .detail("DesiredShardID", format("%016llx", s->value()->getDesiredShardId())) + .detail("ShardRange", s->value()->range()) .detail("ShardState", s->value()->debugDescribeState()) .log(); - ASSERT(s->value()->keys == s->range()); - ASSERT(!s->value()->keys.empty()); + ASSERT(s->value()->range() == s->range()); + ASSERT(!s->value()->range().empty()); if (data->shardAware) { s->value()->validate(); } @@ -2122,7 +2137,7 @@ void validate(StorageServer* data, bool force = false) { for (auto a = ar.begin(); a != ar.end(); ++a) { TraceEvent(SevVerbose, "ValidateShardReadable", data->thisServerID) .detail("Range", s->range()) - .detail("ShardRange", s->value()->keys) + .detail("ShardRange", s->value()->range()) .detail("ShardState", s->value()->debugDescribeState()) .detail("AvailableRange", a->range()) .detail("AvailableVersion", a->value()) @@ -2165,11 +2180,11 @@ void validate(StorageServer* data, bool force = false) { if (shard->assigned() && data->shardAware) { TraceEvent(SevVerbose, "ValidateAssignedShard", data->thisServerID) - .detail("Range", shard->keys) - .detailf("ShardID", "%016llx", shard->shardId) - .detailf("DesiredShardID", "%016llx", shard->desiredShardId) + .detail("Range", shard->range()) + .detailf("ShardID", "%016llx", shard->getShardId()) + .detailf("DesiredShardID", "%016llx", shard->getDesiredShardId()) .detail("State", shard->debugDescribeState()); - ASSERT(shard->shardId != 0UL && shard->desiredShardId != 0UL); + ASSERT(shard->getShardId() != 0UL && shard->getDesiredShardId() != 0UL); } } @@ -3985,20 +4000,20 @@ ACTOR Future getShardState_impl(StorageServer* data, GetShardStateRequest } if (req.mode == GetShardStateRequest::READABLE && !t.value()->isReadable()) { - if (t.value()->adding) { - onChange.push_back(t.value()->adding->readWrite.getFuture()); + if (t.value()->getAddingShard()) { + onChange.push_back(t.value()->getAddingShard()->readWrite.getFuture()); } else { - ASSERT(t.value()->moveInShard); - onChange.push_back(t.value()->moveInShard->readWrite.getFuture()); + ASSERT(t.value()->getMoveInShard()); + onChange.push_back(t.value()->getMoveInShard()->readWrite.getFuture()); } } if (req.mode == GetShardStateRequest::FETCHING && !t.value()->isFetched()) { - if (t.value()->adding) { - onChange.push_back(t.value()->adding->fetchComplete.getFuture()); + if (t.value()->getAddingShard()) { + onChange.push_back(t.value()->getAddingShard()->fetchComplete.getFuture()); } else { - ASSERT(t.value()->moveInShard); - onChange.push_back(t.value()->moveInShard->fetchComplete.getFuture()); + ASSERT(t.value()->getMoveInShard()); + onChange.push_back(t.value()->getMoveInShard()->fetchComplete.getFuture()); } } } @@ -5091,7 +5106,7 @@ struct AuditGetShardInfoRes { AuditGetShardInfoRes getThisServerShardInfo(StorageServer* data, KeyRange range) { std::vector ownRange; for (auto& t : data->shards.intersectingRanges(range)) { - KeyRange alignedRange = t.value()->keys & range; + KeyRange alignedRange = t.value()->range() & range; if (alignedRange.empty()) { TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "SSAuditStorageReadShardInfoEmptyAlignedRange", @@ -5101,11 +5116,11 @@ AuditGetShardInfoRes getThisServerShardInfo(StorageServer* data, KeyRange range) } TraceEvent(SevVerbose, "SSAuditStorageGetThisServerShardInfo", data->thisServerID) .detail("AlignedRange", alignedRange) - .detail("Range", t.value()->keys) + .detail("Range", t.value()->range()) .detail("AtVersion", data->version.get()) .detail("AuditServer", data->thisServerID) - .detail("ReadWrite", t.value()->readWrite ? "True" : "False") - .detail("Adding", t.value()->adding ? "True" : "False"); + .detail("ReadWrite", t.value()->isReadable() ? "True" : "False") + .detail("Adding", t.value()->isFetching() ? "True" : "False"); if (t.value()->assigned()) { ownRange.push_back(alignedRange); } @@ -9315,9 +9330,9 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("Phase", "Split range"); } } - shard = data->shards.rangeContaining(keys.begin).value()->adding.get(); + shard = data->shards.rangeContaining(keys.begin).value()->getAddingShard(); warningLogger = logFetchKeysWarning(shard); - AddingShard* otherShard = data->shards.rangeContaining(blockBegin).value()->adding.get(); + AddingShard* otherShard = data->shards.rangeContaining(blockBegin).value()->getAddingShard(); keys = shard->keys; // Split our prior updates. The ones that apply to our new, restricted key range will go back @@ -9534,7 +9549,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { wait(data->durableVersion.whenAtLeast(feedTransferredVersion)); ASSERT(data->shards[shard->keys.begin]->assigned() && - data->shards[shard->keys.begin]->keys == + data->shards[shard->keys.begin]->range() == shard->keys); // We aren't changing whether the shard is assigned data->newestAvailableVersion.insert(shard->keys, latestVersion); shard->readWrite.send(Void()); @@ -9681,8 +9696,8 @@ ACTOR Future fallBackToAddingShard(StorageServer* data, MoveInShard* moveI moveInShard->cancel(); for (const auto& range : moveInShard->meta->ranges) { const Reference& currentShard = data->shards[range.begin]; - if (currentShard->moveInShard && currentShard->moveInShard->id() == moveInShard->id()) { - ASSERT(range == currentShard->keys); + if (currentShard->getMoveInShard() && currentShard->getMoveInShard()->id() == moveInShard->id()) { + ASSERT(range == currentShard->range()); changeServerKeysWithPhysicalShards(data, range, moveInShard->dataMoveId(), @@ -9693,7 +9708,7 @@ ACTOR Future fallBackToAddingShard(StorageServer* data, MoveInShard* moveI ConductBulkLoad::False); } else { TraceEvent(SevWarn, "ShardAlreadyChanged", data->thisServerID) - .detail("ShardRange", currentShard->keys) + .detail("ShardRange", currentShard->range()) .detail("ShardState", currentShard->debugDescribeState()); } } @@ -10062,7 +10077,7 @@ ACTOR Future fetchShardApplyUpdates(StorageServer* data, std::sort(ranges.begin(), ranges.end(), KeyRangeRef::ArbitraryOrder()); for (const auto& range : ranges) { TraceEvent(moveInShard->logSev, "PersistShardReadWriteStatus").detail("Range", range); - ASSERT(data->shards[range.begin]->keys == range); + ASSERT(data->shards[range.begin]->range() == range); StorageServerShard newShard = data->shards[range.begin]->toStorageServerShard(); ASSERT(newShard.range == range); ASSERT(newShard.getShardState() == StorageServerShard::ReadWritePending); @@ -10108,7 +10123,7 @@ ACTOR Future fetchShardApplyUpdates(StorageServer* data, for (const auto& range : moveInShard->ranges()) { const Reference& currentShard = data->shards[range.begin]; - if (!currentShard->moveInShard || currentShard->moveInShard->id() != moveInShard->id()) { + if (!currentShard->getMoveInShard() || currentShard->getMoveInShard()->id() != moveInShard->id()) { TraceEvent(SevWarn, "MoveInShardChanged", data->thisServerID) .detail("CurrentShard", currentShard->debugDescribeState()) .detail("MoveInShard", moveInShard->toString()); @@ -10154,7 +10169,7 @@ ACTOR Future cleanUpMoveInShard(StorageServer* data, Version version, Move for (const auto& mir : moveInShard->ranges()) { auto existingShards = data->shards.intersectingRanges(mir); for (auto it = existingShards.begin(); it != existingShards.end(); ++it) { - if (it->value()->moveInShard && it->value()->moveInShard->id() == moveInShard->id()) { + if (it->value()->getMoveInShard() && it->value()->getMoveInShard()->id() == moveInShard->id()) { clearRecord = false; break; } @@ -10588,7 +10603,7 @@ ACTOR Future restoreShards(StorageServer* data, for (auto it = existingShards.begin(); it != existingShards.end(); ++it) { TraceEvent(SevVerbose, "RestoreShardsIntersectingRange", data->thisServerID) .detail("StorageShard", shard.toString()) - .detail("IntersectingShardRange", it->value()->keys) + .detail("IntersectingShardRange", it->value()->range()) .detail("IntersectingShardState", it->value()->debugDescribeState()) .log(); ASSERT(it->value()->notAssigned()); @@ -10648,7 +10663,7 @@ ACTOR Future restoreShards(StorageServer* data, TraceEvent(SevVerbose, "RestoreShardsValidateAvailable", data->thisServerID) .detail("Range", shardRange) .detail("Available", nowAvailable) - .detail("IntersectingShardRange", it->value()->keys) + .detail("IntersectingShardRange", it->value()->range()) .detail("IntersectingShardState", it->value()->debugDescribeState()) .log(); if (nowAvailable) { @@ -10674,7 +10689,7 @@ ACTOR Future restoreShards(StorageServer* data, TraceEvent(SevVerbose, "RestoreShardsValidateAssigned", data->thisServerID) .detail("Range", shardRange) .detail("Assigned", nowAssigned) - .detail("IntersectingShardRange", it->value()->keys) + .detail("IntersectingShardRange", it->value()->range()) .detail("IntersectingShardState", it->value()->debugDescribeState()) .log(); @@ -10817,9 +10832,11 @@ void changeServerKeys(StorageServer* data, else if (ranges[i].value->isReadable()) data->addShard(ShardInfo::newReadWrite(ranges[i], data)); else { - ASSERT(ranges[i].value->adding); - data->addShard(ShardInfo::newAdding( - data, ranges[i], ranges[i].value->adding->reason, ranges[i].value->adding->getSSBulkLoadMetadata())); + ASSERT(ranges[i].value->getAddingShard()); + data->addShard(ShardInfo::newAdding(data, + ranges[i], + ranges[i].value->getAddingShard()->reason, + ranges[i].value->getAddingShard()->getSSBulkLoadMetadata())); CODE_PROBE(true, "ChangeServerKeys reFetchKeys"); } } @@ -10873,7 +10890,7 @@ void changeServerKeys(StorageServer* data, setAvailableStatus(data, range, true); } else { auto& shard = data->shards[range.begin]; - if (!shard->assigned() || shard->keys != range) + if (!shard->assigned() || shard->range() != range) data->addShard(ShardInfo::newAdding(data, range, dataMoveReason, bulkLoadInfoForAddingShard)); } } else { @@ -10997,7 +11014,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("Assigned", nowAssigned) .detail("DataMoveId", dataMoveId) .detail("Version", version) - .detail("InitialVersion", currentShard->version); + .detail("InitialVersion", currentShard->getVersion()); throw dataMoveConflictError(data->isTss()); } continue; @@ -11014,9 +11031,9 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("ModifiedRange", keys) .detail("DataMoveId", dataMoveId) .detail("Version", version) - .detail("ConflictingShard", currentShard->shardId) - .detail("DesiredShardId", currentShard->desiredShardId) - .detail("InitialVersion", currentShard->version); + .detail("ConflictingShard", currentShard->getShardId()) + .detail("DesiredShardId", currentShard->getDesiredShardId()) + .detail("InitialVersion", currentShard->getVersion()); throw dataMoveConflictError(data->isTss()); } StorageServerShard newShard = currentShard->toStorageServerShard(); @@ -11043,7 +11060,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, } // Shard is being moved. - if (currentShard->adding) { + if (currentShard->getAddingShard()) { if (nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UpdateAddingShard") @@ -11051,9 +11068,9 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("ModifiedRange", keys) .detail("DataMoveId", dataMoveId) .detail("Version", version) - .detail("ConflictingShard", currentShard->shardId) - .detail("DesiredShardId", currentShard->desiredShardId) - .detail("InitialVersion", currentShard->version); + .detail("ConflictingShard", currentShard->getShardId()) + .detail("DesiredShardId", currentShard->getDesiredShardId()) + .detail("InitialVersion", currentShard->getVersion()); throw dataMoveConflictError(data->isTss()); } @@ -11069,7 +11086,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, continue; } - if (currentShard->moveInShard) { + if (currentShard->getMoveInShard()) { if (nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UpdateMoveInShard") @@ -11077,15 +11094,15 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("ModifiedRange", keys) .detail("DataMoveId", dataMoveId) .detail("Version", version) - .detail("ConflictingShard", currentShard->shardId) - .detail("DesiredShardId", currentShard->desiredShardId) - .detail("InitialVersion", currentShard->version); + .detail("ConflictingShard", currentShard->getShardId()) + .detail("DesiredShardId", currentShard->getDesiredShardId()) + .detail("InitialVersion", currentShard->getVersion()); throw dataMoveConflictError(data->isTss()); } // FetchShard will be cancelled. - currentShard->moveInShard->cancel(); - updatedMoveInShards.emplace(currentShard->moveInShard->id(), currentShard->moveInShard); + currentShard->getMoveInShard()->cancel(); + updatedMoveInShards.emplace(currentShard->getMoveInShard()->id(), currentShard->getMoveInShard()); StorageServerShard newShard = currentShard->toStorageServerShard(); newShard.range = currentRange; data->addShard(ShardInfo::newShard(data, newShard)); @@ -11141,8 +11158,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, changeNewestAvailable.emplace_back(range, version); removeRanges.push_back(range); } - if (r->value()->moveInShard) { - r->value()->moveInShard->cancel(); + if (r->value()->getMoveInShard()) { + r->value()->getMoveInShard()->cancel(); // This is an overkill, and is necessary only when psm has written data to `range`; Also we don't need // to clean up the PTree. removeRanges.push_back(range); @@ -11161,7 +11178,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, // Shard already available in SS. Update desired shard id. if (dataAvailable) { updatedShards.push_back(StorageServerShard( - range, cVer, data->shards[range.begin]->shardId, desiredId, StorageServerShard::ReadWrite)); + range, cVer, data->shards[range.begin]->getShardId(), desiredId, StorageServerShard::ReadWrite)); changeNewestAvailable.emplace_back(range, latestVersion); TraceEvent(sevDm, "SSAssignShardAlreadyAvailable", data->thisServerID) .detail("Range", range) @@ -11217,13 +11234,13 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NewShard", updatedShards.back().toString()); } else { // Shard is being moved. - ASSERT(shard->adding != nullptr || shard->moveInShard != nullptr); - if (shard->desiredShardId != desiredId) { + ASSERT(shard->getAddingShard() != nullptr || shard->getMoveInShard() != nullptr); + if (shard->getDesiredShardId() != desiredId) { TraceEvent(SevWarnAlways, "CSKConflictingMoveInShards", data->thisServerID) .detail("DataMoveID", dataMoveId) .detail("Range", range) .detailf("TargetShard", "%016llx", desiredId) - .detailf("CurrentShard", "%016llx", shard->desiredShardId) + .detailf("CurrentShard", "%016llx", shard->getDesiredShardId()) .detail("IsTSS", data->isTss()) .detail("Version", cVer); throw dataMoveConflictError(data->isTss()); @@ -11233,7 +11250,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detailf("TargetShard", "%016llx", desiredId) .detail("MoveRange", keys) .detail("Range", range) - .detail("ExistingShardRange", shard->keys) + .detail("ExistingShardRange", shard->range()) .detail("ShardDebugString", shard->debugDescribeState()) .detail("Version", cVer); if (context == CSK_FALL_BACK) { @@ -11834,7 +11851,7 @@ class StorageUpdater { } *keyPos = keyNum % 0xff; auto r = data->shards.rangeContaining(StringRef(keyBuf, keyPos - keyBuf + 1)).value(); - if (!r || !(r->adding || r->moveInShard || r->readWrite)) { + if (!r || !(r->getAddingShard() || r->getMoveInShard() || r->isReadable())) { break; } @@ -12411,7 +12428,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { std::min(static_cast(data->constructedData.size()), SERVER_KNOBS->GENERATE_DATA_PER_VERSION_MAX); for (int m = 0; m < mutationCount; m++) { auto r = data->shards.rangeContaining(data->constructedData.front().first).value(); - if (r && (r->adding || r->moveInShard || r->readWrite)) { + if (r && (r->getAddingShard() || r->getMoveInShard() || r->isReadable())) { MutationRef constructedMutation(MutationRef::SetValue, data->constructedData.front().first, data->constructedData.front().second); @@ -14644,8 +14661,8 @@ ACTOR Future storageEngineConsistencyCheck(StorageServer* self) { continue; } if (it.value()->assigned()) { - currentShards.insert(it.range(), format("%016llx", it.value()->shardId)); - teamShardCount[it.value()->teamId]++; + currentShards.insert(it.range(), format("%016llx", it.value()->getShardId())); + teamShardCount[it.value()->getTeamId()]++; } }