Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_INVALIDATE_EXCLUDED_PROCESSES, false); if (isSimulated) CC_INVALIDATE_EXCLUDED_PROCESSES = deterministicRandom()->coinflip();
init( CC_GRAY_FAILURE_STATUS_JSON, false); if (isSimulated) CC_GRAY_FAILURE_STATUS_JSON = true;
init( CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL, 0.5 );
init( CC_RECOVERY_INIT_REQ_TIMEOUT, 60.0 );

init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
bool CC_GRAY_FAILURE_STATUS_JSON; // When enabled, returns gray failure information in machine readable status json.
double CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL; // The interval to prevent re-recruiting the same singleton if a
// recruiting fight between two cluster controllers occurs.
double CC_RECOVERY_INIT_REQ_TIMEOUT; // Timeout value after which CC stops waiting to hear back for a response to
// recruitment initialization message

// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)
Expand Down
26 changes: 19 additions & 7 deletions fdbserver/TagPartitionedLogSystem.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,9 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
// Recruit log routers for old generations of the primary locality
if (tLogs->locality == locality) {
logRouterInitializationReplies.emplace_back();
TraceEvent("LogRouterInitReqSent1")
.detail("Locality", locality)
.detail("LogRouterTags", self->logRouterTags);
for (int i = 0; i < self->logRouterTags; i++) {
InitializeLogRouterRequest req;
req.recoveryCount = recoveryCount;
Expand All @@ -2799,8 +2802,10 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
req.recoverAt = self->recoverAt.get();
req.knownLockedTLogIds = self->knownLockedTLogIds;
auto reply = transformErrors(
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
throwErrorOr(timeoutError(
Copy link
Contributor

@dlambrig dlambrig Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should throw a log message when we timeout

        TraceEvent("LogRouterInitTimeout")
            .detail("RequestIndex", i)
            .detail("Worker", workers[nextRouter].id())
            .detail("RecoveryCount", recoveryCount);

if this is added in more places in a future PR, maybe its worth modifying timeoutError to accept a lambda, something like:

timeoutError(
    getReplyUnlessFailedFor(...),
    SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT,
    [=]() {
        TraceEvent("LogRouterInitTimeout")
            .detail("RequestIndex", i)
            .detail("Worker", workers[nextRouter].id())
            .detail("RecoveryCount", recoveryCount);
        return cluster_recovery_failed();
    }
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was also thinking something like that e.g. add optional context to timeoutError. I was thinking to do this in a follow-up PR. But maybe this can implemented in a different way if we go with Jingyu's idea here: #12396 (review).

workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY),
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT)),
cluster_recovery_failed());
logRouterInitializationReplies.back().push_back(reply);
allReplies.push_back(reply);
Expand Down Expand Up @@ -2839,6 +2844,9 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
// Recruit log routers for old generations of the primary locality
if (tLogs->locality == locality) {
logRouterInitializationReplies.emplace_back();
TraceEvent("LogRouterInitReqSent2")
.detail("Locality", locality)
.detail("LogRouterTags", old.logRouterTags);
for (int i = 0; i < old.logRouterTags; i++) {
InitializeLogRouterRequest req;
req.recoveryCount = recoveryCount;
Expand All @@ -2849,8 +2857,10 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
req.locality = locality;
req.recoverAt = old.recoverAt;
auto reply = transformErrors(
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
throwErrorOr(timeoutError(
workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY),
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT)),
cluster_recovery_failed());
logRouterInitializationReplies.back().push_back(reply);
allReplies.push_back(reply);
Expand All @@ -2860,7 +2870,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
}
}

wait(waitForAll(allReplies));
wait(traceAfter(waitForAll(allReplies), "AllLogRouterRepliesReceived"));

int nextReplies = 0;
lastStart = std::numeric_limits<Version>::max();
Expand Down Expand Up @@ -3004,6 +3014,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
const Version startVersion = oldLogSystem->logRouterTags == 0
? oldLogSystem->recoverAt.get() + 1
: std::max(self->tLogs[0]->startVersion, logSet->startVersion);
TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", self->logRouterTags);
for (int i = 0; i < self->logRouterTags; i++) {
InitializeLogRouterRequest req;
req.recoveryCount = recoveryCount;
Expand All @@ -3015,9 +3026,10 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
TraceEvent("RemoteTLogRouterReplies", self->dbgid)
.detail("WorkerID", remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].id());
logRouterInitializationReplies.push_back(transformErrors(
throwErrorOr(
throwErrorOr(timeoutError(
remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY),
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT)),
cluster_recovery_failed()));
}

Expand Down
14 changes: 13 additions & 1 deletion fdbserver/worker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "flow/ApiVersion.h"
#include "flow/Buggify.h"
#include "flow/CodeProbe.h"
#include "flow/IAsyncFile.h"
#include "fdbrpc/Locality.h"
Expand Down Expand Up @@ -2194,6 +2195,14 @@ void cleanupStorageDisks(Reference<AsyncVar<ServerDBInfo>> dbInfo,
}
}

bool skipInitRspInSim() {
const bool skip = g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01 /* 1% */);
if (skip) {
TraceEvent("SkipInitRspInSimTrue");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include interf.id() in log?

}
return skip;
}

ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
Expand Down Expand Up @@ -3367,7 +3376,10 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
errorForwarders.add(
zombie(recruited,
forwardError(errors, Role::LOG_ROUTER, recruited.id(), logRouter(recruited, req, dbInfo))));
req.reply.send(recruited);

if (!skipInitRspInSim()) {
req.reply.send(recruited);
}
}
when(CoordinationPingMessage m = waitNext(interf.coordinationPing.getFuture())) {
TraceEvent("CoordinationPing", interf.id())
Expand Down