diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 620eef2d842..993370ae278 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -842,6 +842,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( CC_INVALIDATE_EXCLUDED_PROCESSES, false); if (isSimulated) CC_INVALIDATE_EXCLUDED_PROCESSES = deterministicRandom()->coinflip(); init( CC_GRAY_FAILURE_STATUS_JSON, false); if (isSimulated) CC_GRAY_FAILURE_STATUS_JSON = true; init( CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL, 0.5 ); + init( CC_RECOVERY_INIT_REQ_TIMEOUT, 30.0 ); + init( CC_RECOVERY_INIT_REQ_GROWTH_FACTOR, 2.0 ); + init( CC_RECOVERY_INIT_REQ_MAX_TIMEOUT, 300.0 ); + init( CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES, 100 ); init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index a54d9d73b9b..09ec893846b 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -850,6 +850,15 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl + // 1 and <= 10 to prevent overflow. + double CC_RECOVERY_INIT_REQ_MAX_TIMEOUT; // Maximum timeout (seconds) for transaction system initialization. Only + // applies to initializing_transaction_servers phase. + int CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES; // Maximum unfinished recoveries after which transaction system + // intilization timeouts above do not apply. // Knobs used to select the best policy (via monte carlo) int POLICY_RATING_TESTS; // number of tests per policy (in order to compare) diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index bddb082ed31..bd814327362 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include #include #include "fdbclient/FDBTypes.h" @@ -29,7 +30,9 @@ #include "fdbserver/Knobs.h" #include "fdbserver/MasterInterface.h" #include "fdbserver/WaitFailure.h" +#include "flow/Error.h" #include "flow/ProtocolVersion.h" +#include "flow/Trace.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -963,6 +966,66 @@ ACTOR Future> provisionalMaster(Reference monitorInitializingTxnSystem(int unfinishedRecoveries) { + // Validate parameters to prevent overflow and ensure exponential backoff works correctly + // With growth factor <= 10 and unfinishedRecoveries <= 100, max scaling factor is 10^100 + const bool validParameters = unfinishedRecoveries >= 1 && SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT > 0 && + SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT > 0 && + SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR > 1.0 && + SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR <= 10.0; + + if (!validParameters) { + TraceEvent(SevWarnAlways, "InitializingTxnSystemTimeoutInvalid") + .detail("BaseTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT) + .detail("GrowthFactor", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR) + .detail("MaxTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT) + .detail("UnfinishedRecoveries", unfinishedRecoveries) + .detail("MaxUnfinishedRecoveries", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES); + ASSERT_WE_THINK(false); // it is expected for these parameters to always be valid so we assert/crash in + // simulation if that's not the case + return Never(); + } + + const bool tooManyUnfinishedRecoveries = + unfinishedRecoveries >= SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES; + if (tooManyUnfinishedRecoveries) { + TraceEvent(SevWarnAlways, "InitializingTxnSystemTimeoutTooMany") + .detail("BaseTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT) + .detail("GrowthFactor", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR) + .detail("MaxTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT) + .detail("UnfinishedRecoveries", unfinishedRecoveries) + .detail("MaxUnfinishedRecoveries", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES); + return Never(); // if there have been too many recoveries, clearly something is wrong. At this point, an + // operator needs to look into the issue rather than us relying on this timeout monitor. + // Triggering more timeouts can make the situation worse. + } + + // Calculate timeout with exponential backoff + const double scalingFactor = std::pow(SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR, unfinishedRecoveries); + const double scaledTimeout = std::min(SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT * scalingFactor, + SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT); + + TraceEvent("InitializingTxnSystemTimeout") + .detail("BaseTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT) + .detail("GrowthFactor", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR) + .detail("MaxTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT) + .detail("UnfinishedRecoveries", unfinishedRecoveries) + .detail("ScalingFactor", scalingFactor) + .detail("ScaledTimeout", scaledTimeout); + + wait(delay(scaledTimeout)); + + TraceEvent("InitializingTxnSystemTimeoutTriggered"); + throw cluster_recovery_failed(); +} + ACTOR Future>> recruitEverything( Reference self, std::vector* seedServers, @@ -1061,13 +1124,19 @@ ACTOR Future>> recruitEverything( .detail("RemoteDcIds", remoteDcIds) .trackLatest(self->clusterRecoveryStateEventHolder->trackingKey); - // Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand - // new database we are sort of lying that we are past the recruitment phase. In a perfect world we would split that - // up so that the recruitment part happens above (in parallel with recruiting the transaction servers?). + // Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a + // brand new database we are sort of lying that we are past the recruitment phase. In a perfect world we would + // split that up so that the recruitment part happens above (in parallel with recruiting the transaction + // servers?). wait(newSeedServers(self, recruits, seedServers)); + state std::vector> confChanges; - wait(newCommitProxies(self, recruits) && newGrvProxies(self, recruits) && newResolvers(self, recruits) && - newTLogServers(self, recruits, oldLogSystem, &confChanges)); + Future txnSystemInitialized = + traceAfter(newCommitProxies(self, recruits), "CommitProxiesInitialized") && + traceAfter(newGrvProxies(self, recruits), "GRVProxiesInitialized") && + traceAfter(newResolvers(self, recruits), "ResolversInitialized") && + traceAfter(newTLogServers(self, recruits, oldLogSystem, &confChanges), "TLogServersInitialized"); + wait(txnSystemInitialized || monitorInitializingTxnSystem(self->controllerData->db.unfinishedRecoveries)); // Update recovery related information to the newly elected sequencer (master) process. wait(brokenPromiseToNever( diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 7e87e3e2012..57ce5881c06 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -2788,6 +2788,9 @@ ACTOR Future TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL // Recruit log routers for old generations of the primary locality if (tLogs->locality == locality) { logRouterInitializationReplies.emplace_back(); + TraceEvent("LogRouterInitReqSent1") + .detail("Locality", locality) + .detail("LogRouterTags", self->logRouterTags); for (int i = 0; i < self->logRouterTags; i++) { InitializeLogRouterRequest req; req.recoveryCount = recoveryCount; @@ -2798,6 +2801,7 @@ ACTOR Future TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL req.locality = locality; req.recoverAt = self->recoverAt.get(); req.knownLockedTLogIds = self->knownLockedTLogIds; + req.allowDropInSim = !forRemote; auto reply = transformErrors( throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), @@ -2839,6 +2843,9 @@ ACTOR Future TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL // Recruit log routers for old generations of the primary locality if (tLogs->locality == locality) { logRouterInitializationReplies.emplace_back(); + TraceEvent("LogRouterInitReqSent2") + .detail("Locality", locality) + .detail("LogRouterTags", old.logRouterTags); for (int i = 0; i < old.logRouterTags; i++) { InitializeLogRouterRequest req; req.recoveryCount = recoveryCount; @@ -2848,6 +2855,7 @@ ACTOR Future TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL req.tLogPolicy = tLogPolicy; req.locality = locality; req.recoverAt = old.recoverAt; + req.allowDropInSim = !forRemote; auto reply = transformErrors( throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), @@ -2860,7 +2868,7 @@ ACTOR Future TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL } } - wait(waitForAll(allReplies)); + wait(traceAfter(waitForAll(allReplies), "AllLogRouterRepliesReceived")); int nextReplies = 0; lastStart = std::numeric_limits::max(); @@ -2997,13 +3005,14 @@ ACTOR Future TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst logSet->startVersion, localities, logSet->tLogPolicy, - true); + /* forRemote */ true); } state std::vector> logRouterInitializationReplies; const Version startVersion = oldLogSystem->logRouterTags == 0 ? oldLogSystem->recoverAt.get() + 1 : std::max(self->tLogs[0]->startVersion, logSet->startVersion); + TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", self->logRouterTags); for (int i = 0; i < self->logRouterTags; i++) { InitializeLogRouterRequest req; req.recoveryCount = recoveryCount; @@ -3012,6 +3021,7 @@ ACTOR Future TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst req.tLogLocalities = localities; req.tLogPolicy = logSet->tLogPolicy; req.locality = remoteLocality; + req.allowDropInSim = false; TraceEvent("RemoteTLogRouterReplies", self->dbgid) .detail("WorkerID", remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].id()); logRouterInitializationReplies.push_back(transformErrors( @@ -3088,7 +3098,7 @@ ACTOR Future TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst remoteTLogInitializationReplies.reserve(remoteWorkers.remoteTLogs.size()); for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) { - TraceEvent("RemoteTLogReplies", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id()); + TraceEvent("RemoteTLogInitReqSent", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id()); remoteTLogInitializationReplies.push_back(transformErrors( throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), @@ -3287,7 +3297,7 @@ ACTOR Future> TagPartitionedLogSystem::newEpoch( logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, - false); + /* forRemote */ false); if (oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion > SERVER_KNOBS->MAX_RECOVERY_VERSIONS) { // make sure we can recover in the other DC. @@ -3376,7 +3386,7 @@ ACTOR Future> TagPartitionedLogSystem::newEpoch( primaryTLogReplies.reserve(recr.tLogs.size()); for (int i = 0; i < recr.tLogs.size(); i++) { - TraceEvent("PrimaryTLogReqSent", logSystem->getDebugID()).detail("WorkerID", recr.tLogs[i].id()); + TraceEvent("PrimaryTLogInitReqSent", logSystem->getDebugID()).detail("WorkerID", recr.tLogs[i].id()); primaryTLogReplies.push_back(transformErrors( throwErrorOr(recr.tLogs[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), @@ -3443,7 +3453,7 @@ ACTOR Future> TagPartitionedLogSystem::newEpoch( satelliteInitializationReplies.reserve(recr.satelliteTLogs.size()); for (int i = 0; i < recr.satelliteTLogs.size(); i++) { - TraceEvent("PrimarySatelliteTLogReplies", logSystem->getDebugID()) + TraceEvent("PrimarySatelliteTLogInitReqSent", logSystem->getDebugID()) .detail("WorkerID", recr.satelliteTLogs[i].id()); satelliteInitializationReplies.push_back(transformErrors( throwErrorOr(recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor( diff --git a/fdbserver/include/fdbserver/ClusterController.actor.h b/fdbserver/include/fdbserver/ClusterController.actor.h index 79cbb3ee51e..f4f8c5d4678 100644 --- a/fdbserver/include/fdbserver/ClusterController.actor.h +++ b/fdbserver/include/fdbserver/ClusterController.actor.h @@ -142,7 +142,9 @@ class ClusterControllerData { DatabaseConfiguration config; // Asynchronously updated via master registration DatabaseConfiguration fullyRecoveredConfig; Database db; - int unfinishedRecoveries; + int unfinishedRecoveries; // Counter tracking incomplete recovery attempts. Incremented when a new + // sequencer/master is recruited, reset to 0 when recovery reaches fully_recovered. + // A high value indicates multiple recovery attempts that failed to complete. bool cachePopulated; std::map> clientStatus; Future clientCounter; diff --git a/fdbserver/include/fdbserver/WorkerInterface.actor.h b/fdbserver/include/fdbserver/WorkerInterface.actor.h index 440468f09e5..fc82b2a5c7f 100644 --- a/fdbserver/include/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/include/fdbserver/WorkerInterface.actor.h @@ -674,6 +674,9 @@ struct InitializeLogRouterRequest { // information from the logSystem). Optional>> knownLockedTLogIds = Optional>>(); + bool allowDropInSim; // Simulation-only field for fault injection testing. When true, allows the worker to + // selectively drop responses to initialization messages to test recovery behavior under + // partial failures. Must only be true in simulation. template void serialize(Ar& ar) { @@ -686,7 +689,8 @@ struct InitializeLogRouterRequest { locality, reply, recoverAt, - knownLockedTLogIds); + knownLockedTLogIds, + allowDropInSim); } }; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index b9e052096be..7f4eca77bee 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -26,6 +26,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbserver/BlobMigratorInterface.h" #include "flow/ApiVersion.h" +#include "flow/Buggify.h" #include "flow/CodeProbe.h" #include "flow/IAsyncFile.h" #include "fdbrpc/Locality.h" @@ -2194,6 +2195,14 @@ void cleanupStorageDisks(Reference> dbInfo, } } +bool skipInitRspInSim(const UID workerInterfID, const bool allowDropInSim) { + const bool skip = allowDropInSim && g_network->isSimulated() && BUGGIFY_WITH_PROB(/* 1% */ 0.01); + if (skip) { + TraceEvent("SkipInitRspInSimTrue").detail("WorkerInterfID", workerInterfID); + } + return skip; +} + ACTOR Future workerServer(Reference connRecord, Reference> const> ccInterface, LocalityData locality, @@ -3367,7 +3376,10 @@ ACTOR Future workerServer(Reference connRecord, errorForwarders.add( zombie(recruited, forwardError(errors, Role::LOG_ROUTER, recruited.id(), logRouter(recruited, req, dbInfo)))); - req.reply.send(recruited); + + if (!skipInitRspInSim(interf.id(), req.allowDropInSim)) { + req.reply.send(recruited); + } } when(CoordinationPingMessage m = waitNext(interf.coordinationPing.getFuture())) { TraceEvent("CoordinationPing", interf.id()) diff --git a/tests/slow/GcGenerations.toml b/tests/slow/GcGenerations.toml index 16deb5635ee..61d6dbe5693 100644 --- a/tests/slow/GcGenerations.toml +++ b/tests/slow/GcGenerations.toml @@ -10,6 +10,11 @@ remoteConfig = 'remote_double' max_write_transaction_life_versions = 5000000 record_recover_at_in_cstate = true track_tlog_recovery = true +# The GcGenerations workload intentionally builds up generations by injecting network degradation and timed +# usage of process restarts. How long to inject network degradation is a sensitive value. Having +# a high cc_recovery_init_req_growth_factor could mean that network degradation is healed too early, which +# we do not want in this test. Therefore, a low value like 1.01 is used. +cc_recovery_init_req_growth_factor = 1.01 [[test]] testTitle = 'GcGenerations'