diff --git a/src/server/conn_context.h b/src/server/conn_context.h index ec62e2dd99d3..be6f795ceb31 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -329,6 +329,13 @@ class ConnectionContext : public facade::ConnectionContext { // of it as a state for the connection bool journal_emulated = false; // whether it is used to dispatch journal commands + // Replid to tag journal records produced by commands dispatched on this connection. + // Empty for ordinary client connections (records are local and shipped to all peers). + // Set by JournalExecutor to the upstream master's replid during replication apply, so that + // a downstream JournalStreamer targeting that same upstream skips them (loop suppression + // for active replication / --replica_mode=mutable). + std::string journal_source_replid; + // Reference to a master-side FlowInfo for this connection if it is a replication connection. FlowInfo* master_repl_flow = nullptr; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index c8322c805889..e0caf753f17f 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1282,10 +1282,12 @@ PrimeIterator DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) con int64_t expire_time = it->first.GetExpireTime(); - // Never do expiration if expiration is disabled, or on replicas unless replica_delete_expired - // is enabled (which allows replicas to proactively delete expired keys on the read path). + // Never do expiration if expiration is disabled, or on replicas unless either + // replica_delete_expired is enabled (proactive read-path deletion) or the replica is in + // --replica_mode=mutable (acts as its own master for expiration purposes). if (int64_t(cntx.time_now_ms) < expire_time || !expire_allowed_ || - (owner_->IsReplica() && !absl::GetFlag(FLAGS_replica_delete_expired))) { + (owner_->IsReplica() && !absl::GetFlag(FLAGS_replica_delete_expired) && + !IsReplicaMutable())) { return it; } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index c4836476b36c..d913fa25f74b 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -69,6 +69,7 @@ const char kBadMasterId[] = "bad master id"; const char kIdNotFound[] = "syncid not found"; const char kInvalidSyncId[] = "bad sync id"; const char kInvalidState[] = "invalid state"; +const char kTakeoverOnReplica[] = "TAKEOVER not supported on a chained replica"; bool ToSyncId(string_view str, uint32_t* num) { if (!absl::StartsWith(str, "SYNC")) @@ -247,21 +248,31 @@ void DflyCmd::Flow(CmdArgList args, CommandContext* cmd_cntx) { string_view sync_id_str = ArgS(args, 2); string_view flow_id_str = ArgS(args, 3); + // Optional trailing STABLE keyword: skip full sync and start streaming from current LSN. + // Used by replicas in --replica_mode=mutable / REPLICAOF NO_FULL_SYNC. + size_t pos_args_end = args.size(); + bool stable_only = false; + if (pos_args_end > 4 && absl::EqualsIgnoreCase(ArgS(args, pos_args_end - 1), "STABLE")) { + stable_only = true; + --pos_args_end; + } + std::optional seqid; std::optional last_master_id; std::optional last_master_lsn; - if (args.size() == 5) { + if (pos_args_end == 5) { seqid.emplace(); if (!absl::SimpleAtoi(ArgS(args, 4), &seqid.value())) { return cmd_cntx->SendError(facade::kInvalidIntErr); } - } else if (args.size() == 6) { + } else if (pos_args_end == 6) { last_master_id = ArgS(args, 4); last_master_lsn = ArgS(args, 5); } VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str - << " flow: " << flow_id_str << " seq: " << seqid.value_or(-1); + << " flow: " << flow_id_str << " seq: " << seqid.value_or(-1) + << " stable_only: " << stable_only; if (master_id != sf_->master_replid()) { return cmd_cntx->SendError(kBadMasterId); @@ -300,6 +311,7 @@ void DflyCmd::Flow(CmdArgList args, CommandContext* cmd_cntx) { flow.conn = cmd_cntx->conn(); flow.eof_token = eof_token; flow.version = replica_ptr->version; + flow.target_replid = replica_ptr->id; if (!conn_cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) { // Listener::PreShutdown() triggered @@ -313,8 +325,12 @@ void DflyCmd::Flow(CmdArgList args, CommandContext* cmd_cntx) { std::optional data = sf_->GetLastMasterData(); std::optional lsn_to_start_partial; - // In this flow the master and the registered replica where synced from the same master. - if (last_master_id && data && data->id == *last_master_id) { + if (stable_only) { + // Active replication: start partial-sync from this shard's current LSN. + // Replica does not get history; existing local data is preserved. + lsn_to_start_partial.emplace(journal::GetLsn()); + } else if (last_master_id && data && data->id == *last_master_id) { + // In this flow the master and the registered replica where synced from the same master. ++ServerState::tlocal()->stats.psync_requests_total; auto flow_lsn = ParseLsnVec(*last_master_lsn, data->last_journal_LSNs.size(), flow_id, cmd_cntx); @@ -487,6 +503,10 @@ std::optional DflyCmd::ParseLsnVec(std::string_view last_master_lsn, // timeout_sec - number of seconds to wait for TAKEOVER to converge. // SAVE option is used only by tests. void DflyCmd::TakeOver(CmdArgList args, CommandContext* cmd_cntx) { + if (!ServerState::tlocal()->is_master) { + return cmd_cntx->SendError(kTakeoverOnReplica); + } + CmdArgParser parser{args}; parser.Next(); float timeout = std::ceil(parser.Next()); @@ -747,9 +767,14 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E DCHECK(flow->conn); LSN partial_lsn = flow->start_partial_sync_at.value_or(0); - JournalStreamer::Config config{ - .should_sent_lsn = true, .init_from_stable_sync = true, .start_partial_sync_at = partial_lsn}; - flow->streamer.reset(new JournalStreamer(exec_st, config)); + // For active replication / cascading: streamer skips records whose source_replid + // matches the connecting replica's own replid (sent via REPLCONF CLIENT-ID and + // stored in replica_ptr->id). Empty disables filtering. + JournalStreamer::Config config{.should_sent_lsn = true, + .init_from_stable_sync = true, + .start_partial_sync_at = partial_lsn, + .target_replid = flow->target_replid}; + flow->streamer.reset(new JournalStreamer(exec_st, std::move(config))); flow->streamer->Start(flow->conn->socket()); // Register cleanup. diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 6e2bc207807f..48548aaf8c89 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -49,6 +49,11 @@ struct FlowInfo { std::optional start_partial_sync_at; uint64_t last_acked_lsn = 0; + // Replid of the connected replica (its own master_replid), copied from + // ReplicaInfo::id. Used by JournalStreamer to skip records that originated + // from this replica — loop suppression for active replication. + std::string target_replid; + std::function cleanup; // Optional cleanup for cancellation. }; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 05b50ba35a85..f3f4423c753b 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -807,7 +807,8 @@ void EngineShard::Heartbeat() { } } - if (!IsReplica()) { // Never run expiry/evictions on replica. + // Replicas skip the periodic sweep by default; --replica_mode=mutable opts in. + if (!IsReplica() || IsReplicaMutable()) { RetireExpiredAndEvict(); } @@ -847,7 +848,10 @@ void EngineShard::RetireExpiredAndEvict() { db_cntx.time_now_ms = GetCurrentTimeMs(); size_t deleted_bytes = 0; - size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; + // Cache-mode eviction is master-only: FreeMemWithEvictionStepAtomic asserts !IsReplica(). + // A mutable replica still runs the TTL sweep above but stays out of memory-pressure eviction. + size_t eviction_goal = + (!IsReplica() && GetFlag(FLAGS_enable_heartbeat_eviction)) ? CalculateEvictionBytes() : 0; for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { if (!db_slice.IsDbValid(i)) diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h index af4843beee9d..bb0d39088b8a 100644 --- a/src/server/journal/executor.h +++ b/src/server/journal/executor.h @@ -37,6 +37,12 @@ class JournalExecutor { return &conn_context_; } + // Tag every journal record produced by commands dispatched through this executor with + // the given replid (typically the upstream master's replid). Empty disables tagging. + void SetSourceReplid(std::string replid) { + conn_context_.journal_source_replid = std::move(replid); + } + private: facade::DispatchResult Execute(CommandContext* cmd_cntx); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 865796e87843..bb4b7942dd19 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -81,8 +81,8 @@ LSN GetLsn() { } void RecordEntry(TxId txid, Op opcode, DbIndex dbid, std::optional slot, - Entry::Payload payload) { - journal_slice.AddLogRecord(Entry{txid, opcode, dbid, slot, std::move(payload)}); + Entry::Payload payload, std::string_view source_replid) { + journal_slice.AddLogRecord(Entry{txid, opcode, dbid, slot, std::move(payload)}, source_replid); } void SetFlushMode(bool allow_flush) { diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index c2d7c5b73315..56653abecda5 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -35,7 +35,7 @@ uint32_t RegisterConsumer(JournalConsumerInterface* consumer); void UnregisterConsumer(uint32_t id); void RecordEntry(TxId txid, Op opcode, DbIndex dbid, std::optional slot, - Entry::Payload payload); + Entry::Payload payload, std::string_view source_replid = {}); size_t LsnBufferSize(); size_t LsnBufferBytes(); diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 636170db234c..87ad022f0a65 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -76,7 +76,7 @@ void JournalSlice::SetFlushMode(bool allow_flush) { } } -void JournalSlice::AddLogRecord(const Entry& entry) { +void JournalSlice::AddLogRecord(const Entry& entry, std::string_view source_replid) { DCHECK(ring_buffer_.capacity() > 0); JournalChangeItem item; @@ -88,6 +88,7 @@ void JournalSlice::AddLogRecord(const Entry& entry) { // only used by RestoreStreamer item.cmd = entry.payload.cmd; item.slot = entry.slot; + item.source_replid = source_replid; io::StringSink sink; JournalWriter writer{&sink}; diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 0a849f422aac..24146e3573e9 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -32,7 +32,10 @@ class JournalSlice { return status_ec_; } - void AddLogRecord(const Entry& entry); + // source_replid: replid of the upstream peer this entry was applied from. + // Empty for locally originated writes. Forwarded to JournalChangeItem so + // streamers can skip records that originated from their target peer. + void AddLogRecord(const Entry& entry, std::string_view source_replid = {}); // Register a callback that will be called every time a new entry is // added to the journal. diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 9297159c15dc..cf7f3719aeaa 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -129,6 +129,19 @@ void JournalStreamer::ConsumeJournalChange(const JournalChangeItem& item) { return; } + if (IsLoopSuppressed(item)) { + // Active-replication loop suppression: don't ship the record (it would loop + // back to the peer it came from), but still ship a PING so the receiver's + // record counter advances past this LSN. Otherwise the LSN integrity check + // (tx_executor.cc) would mismatch and partial-sync resume would be off. + io::StringSink sink; + JournalWriter writer{&sink}; + writer.Write(journal::Entry{journal::Op::PING, /*dbid=*/0, /*slot_id=*/std::nullopt}); + Write(std::move(sink).str()); + last_lsn_writen_ = item.journal_item.lsn; + return; + } + DCHECK_GT(item.journal_item.lsn, last_lsn_writen_); Write(item.journal_item.data); time_t now = time(nullptr); diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index a5476adfc1c1..bffb0207fe02 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -24,6 +24,11 @@ class JournalStreamer : public journal::JournalConsumerInterface { bool should_sent_lsn = false; bool init_from_stable_sync = false; LSN start_partial_sync_at = 0; + // For active replication: skip records whose source_replid matches this + // (i.e., records that originated from the peer this streamer is sending to). + // Empty disables filtering. Typically set to the connecting replica's own + // master_replid as conveyed via REPLCONF CLIENT-ID. + std::string target_replid; }; JournalStreamer(ExecutionState* cntx, Config config); @@ -63,6 +68,13 @@ class JournalStreamer : public journal::JournalConsumerInterface { return cntx_->IsRunning(); } + // Active replication: a record is loop-suppressed if its source is the very peer + // we're sending to. The streamer ships a small PING placeholder for these records + // so the replica's LSN counter stays aligned with our journal LSN axis. + bool IsLoopSuppressed(const journal::JournalChangeItem& item) const { + return !config_.target_replid.empty() && item.source_replid == config_.target_replid; + } + void WaitForInflightToComplete(bool with_timeout); size_t inflight_bytes() const { diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 721d551b2f01..81d7625eb0b0 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -86,6 +86,12 @@ struct JournalChangeItem { std::string_view cmd; std::optional slot; + // Replid of the upstream source this record was applied from. Empty for locally + // originated writes. A JournalStreamer skips records whose source_replid equals + // its target replica's replid (active-replication loop suppression). + // Stored as std::string (not string_view) because the originating Transaction may + // be destroyed before async streamer fibers iterate over this item. + std::string source_replid; }; struct JournalConsumerInterface { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 5da081168e1d..15c8a5a272ea 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -811,6 +811,15 @@ pair, OpStatus> PrepareTransaction(const CommandId* c cmd_ctx->SetupTx(cid, dfly_cntx->transaction); + // Carry the source replid from the dispatching connection so that journal records + // emitted by this transaction can be filtered out on streamers targeting the source + // (active replication / --replica_mode=mutable). Always set — including the empty + // case — to clear any value left over from a previous command on a reused + // (MULTI/EXEC) transaction. + if (dfly_cntx->transaction != nullptr) { + dfly_cntx->transaction->SetJournalSourceReplid(dfly_cntx->journal_source_replid); + } + if (init) { DCHECK(cmd_ctx->tx()); if (auto st = @@ -966,6 +975,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("timeout"); config_registry.RegisterMutable("send_timeout"); config_registry.RegisterMutable("managed_service_info"); + config_registry.RegisterMutable("replica_mode"); #ifdef WITH_SEARCH config_registry.RegisterMutable("MAXSEARCHRESULTS"); config_registry.RegisterMutable("search_query_string_bytes"); @@ -1315,7 +1325,7 @@ std::optional Service::VerifyCommandState(const CommandId& cid, CmdA bool under_script = dfly_cntx.conn_state.script_info != nullptr; bool multi_active = dfly_cntx.conn_state.exec_info.IsCollecting() && !is_trans_cmd; - if (!etl.is_master && is_write_cmd && !dfly_cntx.is_replicating) + if (!etl.is_master && is_write_cmd && !dfly_cntx.is_replicating && !IsReplicaMutable()) return ErrorReply{"-READONLY You can't write against a read only replica."}; if (multi_active) { diff --git a/src/server/replica.cc b/src/server/replica.cc index de9431d28688..daa781940c62 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -36,6 +36,7 @@ extern "C" { #include "server/main_service.h" #include "server/namespaces.h" #include "server/rdb_load.h" +#include "server/server_state.h" #include "strings/human_readable.h" #define LOG_REPL_ERROR(msg) \ @@ -94,8 +95,11 @@ vector> Partition(unsigned num_flows) { } // namespace Replica::Replica(string host, uint16_t port, Service* se, std::string_view id, - std::optional slot_range) + std::optional slot_range, bool no_full_sync) : ProtocolClient(std::move(host), port), service_(*se), id_{id}, slot_range_(slot_range) { + // no_full_sync is per-replica config; carry it on master_context_ so it survives + // the per-reconnect re-init in HandleCapaDflyResp. + master_context_.no_full_sync = no_full_sync; proactor_ = ProactorBase::me(); } @@ -410,7 +414,8 @@ std::error_code Replica::HandleCapaDflyResp() { VLOG(1) << "Master id: " << master_context_.master_repl_id << ", sync id: " << master_context_.dfly_session_id << ", num journals: " << param_num_flows - << ", version: " << unsigned(master_context_.version); + << ", version: " << unsigned(master_context_.version) + << ", no_full_sync: " << master_context_.no_full_sync; return error_code{}; } @@ -876,7 +881,7 @@ io::Result DflyShardReplica::StartSyncFlow( VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " " << master_context_.dfly_session_id << " " << flow_id_ << " lsn: " << lsn.value_or(-1); - // DFLY FLOW [lsn] [last_master_id lsn-vec] + // DFLY FLOW [lsn] [last_master_id lsn-vec] [STABLE] std::string cmd = StrCat("DFLY FLOW ", master_context_.master_repl_id, " ", master_context_.dfly_session_id, " ", flow_id_); // Try to negotiate a partial sync if possible. @@ -890,6 +895,11 @@ io::Result DflyShardReplica::StartSyncFlow( absl::StrAppend(&cmd, " ", last_master_data.value().id, " ", lsn_str); VLOG(1) << "Sending last master sync flow " << last_master_data.value().id << " " << lsn_str; } + // Active replication: ask master to skip historical full-sync and start streaming + // from its current LSN. Existing local data on the replica is preserved. + if (master_context_.no_full_sync && master_context_.version >= DflyVersion::VER7) { + absl::StrAppend(&cmd, " STABLE"); + } ResetParser(RedisParser::Mode::CLIENT); leftover_buf_.emplace(128); @@ -1027,7 +1037,11 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) { } else if (tx_data.opcode == journal::Op::PING) { force_ping_ = true; journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); - if (EngineShard::tlocal() && EngineShard::tlocal()->journal()) { + // Don't re-journal PINGs received from upstream when this node accepts client + // writes (active replication). Doing so would relay the master's loop-suppression + // PING placeholders back across the chain, creating an infinite ping-pong. + const bool active_mode = IsReplicaMutable(); + if (!active_mode && EngineShard::tlocal() && EngineShard::tlocal()->journal()) { // We must register this entry to the journal to allow partial sync // if journal is active. journal::RecordEntry(0, journal::Op::PING, 0, nullopt, {}); @@ -1124,6 +1138,9 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m multi_shard_exe_(multi_shard_exe), flow_id_(flow_id) { executor_ = std::make_unique(service); + // Tag records dispatched through this executor with the upstream's replid so that + // a downstream JournalStreamer pointing back at the same upstream skips them. + executor_->SetSourceReplid(master_context.master_repl_id); rdb_loader_ = std::make_unique(&service_, load_context); rdb_loader_->SetLoadUnownedSlots(true); rdb_loader_->SetShardCount(master_context.num_flows); diff --git a/src/server/replica.h b/src/server/replica.h index e2df9079b40b..480bc2f5d28c 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -36,6 +36,8 @@ struct MasterContext { std::string dfly_session_id; // Sync session id for dfly sync. unsigned num_flows = 0; DflyVersion version = DflyVersion::VER1; + // Skip full sync; start streaming from master's current LSN. See --replica_mode=mutable. + bool no_full_sync = false; }; // This class manages replication from both Dragonfly and Redis masters. @@ -54,7 +56,7 @@ class Replica : ProtocolClient { public: Replica(std::string master_host, uint16_t port, Service* se, std::string_view id, - std::optional slot_range); + std::optional slot_range, bool no_full_sync = false); ~Replica(); // Spawns a fiber that runs until link with master is broken or the replication is stopped. diff --git a/src/server/server_family.cc b/src/server/server_family.cc index fb3e16fcabd8..d7d1c08ac65d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -716,6 +716,10 @@ struct ReplicaOfArgs { string host; uint16_t port; std::optional slot_range; + // Skip the initial full sync / RDB load when becoming a replica. Existing data is + // preserved; replication starts from the master's current LSN. Used for active + // replication: see --replica_mode=mutable. + bool no_full_sync = false; static nonstd::expected FromCmdArgs(CmdArgList args); bool IsReplicaOfNoOne() const { return port == 0; @@ -729,6 +733,9 @@ struct ReplicaOfArgs { os << " SLOTS [" << args.slot_range.value().start << "-" << args.slot_range.value().end << "]"; } + if (args.no_full_sync) { + os << " NO_FULL_SYNC"; + } return os; } }; @@ -746,12 +753,17 @@ nonstd::expected ReplicaOfArgs::FromCmdArgs(CmdArgLis if (auto err = parser.TakeError(); err || replicaof_args.port < 1) { return nonstd::make_unexpected(ErrorReply("port is out of range")); } - if (parser.HasNext()) { + if (parser.Check("NO_FULL_SYNC")) { + replicaof_args.no_full_sync = true; + } else if (parser.HasNext()) { auto [slot_start, slot_end] = parser.Next(); replicaof_args.slot_range = cluster::SlotRange{slot_start, slot_end}; if (auto err = parser.TakeError(); err || !replicaof_args.slot_range->IsValid()) { return nonstd::make_unexpected(ErrorReply("Invalid slot range")); } + if (parser.Check("NO_FULL_SYNC")) { + replicaof_args.no_full_sync = true; + } } } @@ -3467,47 +3479,54 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio }; auto add_repl_info = [&] { - if (!m.replica_side_info) { - vector replicas_info = dfly_cmd_->GetReplicasRoleInfo(); - append("role", "master"); - append("connected_slaves", replicas_info.size()); - - if (show_managed_info) { - for (size_t i = 0; i < replicas_info.size(); i++) { - auto& r = replicas_info[i]; - // e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync - append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port, - ",state=", r.state, ",lag=", r.lsn_lag)); - } + const bool is_master = !m.replica_side_info; + append("role", is_master + ? "master" + : (GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica")); + + // Downstream replicas: emit on master AND on chained replicas, since either + // can be the source for further replicas in a cascading topology. + vector replicas_info = dfly_cmd_->GetReplicasRoleInfo(); + append("connected_slaves", replicas_info.size()); + if (show_managed_info) { + for (size_t i = 0; i < replicas_info.size(); i++) { + auto& r = replicas_info[i]; + // e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync + append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port, + ",state=", r.state, ",lag=", r.lsn_lag)); } + } + + if (is_master) { append("master_replid", master_replid_); - } else { - append("role", GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); - - auto replication_info_cb = [&](const Replica::Summary& rinfo) { - append("master_host", rinfo.host); - append("master_port", rinfo.port); - - const char* link = rinfo.master_link_established ? "up" : "down"; - append("master_link_status", link); - append("master_last_io_seconds_ago", rinfo.master_last_io_sec); - append("master_sync_in_progress", rinfo.full_sync_in_progress); - append("master_replid", rinfo.master_id); - if (rinfo.full_sync_done || (rinfo.passed_full_sync && !rinfo.master_link_established)) - append("slave_repl_offset", rinfo.repl_offset_sum); - append("slave_priority", GetFlag(FLAGS_replica_priority)); - append("slave_read_only", 1); - append("psync_attempts", rinfo.psync_attempts); - append("psync_successes", rinfo.psync_successes); - }; + return; + } - const auto& info = *m.replica_side_info; + auto replication_info_cb = [&](const Replica::Summary& rinfo) { + append("master_host", rinfo.host); + append("master_port", rinfo.port); + + const char* link = rinfo.master_link_established ? "up" : "down"; + append("master_link_status", link); + append("master_last_io_seconds_ago", rinfo.master_last_io_sec); + append("master_sync_in_progress", rinfo.full_sync_in_progress); + append("master_replid", rinfo.master_id); + if (rinfo.full_sync_done || (rinfo.passed_full_sync && !rinfo.master_link_established)) + append("slave_repl_offset", rinfo.repl_offset_sum); + append("slave_priority", GetFlag(FLAGS_replica_priority)); + const bool active = IsReplicaMutable(); + append("slave_read_only", active ? 0 : 1); + append("replica_mode", active ? "mutable" : "readonly"); + append("psync_attempts", rinfo.psync_attempts); + append("psync_successes", rinfo.psync_successes); + }; - replication_info_cb(info.summary); - // Special case, when multiple masters replicate to a single replica. - for (const auto& summary : info.cl_repl_summary) { - replication_info_cb(summary); - } + const auto& info = *m.replica_side_info; + + replication_info_cb(info.summary); + // Special case, when multiple masters replicate to a single replica. + for (const auto& summary : info.cl_repl_summary) { + replication_info_cb(summary); } }; @@ -3900,8 +3919,9 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, CommandContext* cmd_cntx, } // Create a new replica and assign it - new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, - master_replid(), replicaof_args->slot_range); + new_replica = + make_shared(replicaof_args->host, replicaof_args->port, &service_, master_replid(), + replicaof_args->slot_range, replicaof_args->no_full_sync); replica_ = new_replica; @@ -4052,8 +4072,9 @@ void ServerFamily::ReplicaOfInternalV2(CmdArgList args, CommandContext* cmd_cntx return ReplicaOfNoOne(cmd_cntx->rb()); } - auto new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, - master_replid(), replicaof_args->slot_range); + auto new_replica = + make_shared(replicaof_args->host, replicaof_args->port, &service_, master_replid(), + replicaof_args->slot_range, replicaof_args->no_full_sync); GenericError ec; switch (on_error) { case ActionOnConnectionFail::kReturnOnError: @@ -4153,12 +4174,6 @@ void ServerFamily::ReplTakeOver(CmdArgList args, CommandContext* cmd_cntx) { void ServerFamily::ReplConf(CmdArgList args, CommandContext* cmd_cntx) { auto* builder = cmd_cntx->rb(); - { - util::fb2::LockGuard lk(replicaof_mu_); - if (!IsMaster()) { - return cmd_cntx->SendError("Replicating a replica is unsupported"); - } - } auto err_cb = [&]() mutable { LOG(ERROR) << "Error in receiving command: " << args; diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 5d5714847fe4..f065c5924785 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -42,11 +42,42 @@ ABSL_FLAG(size_t, serialization_max_chunk_size, 64_KB, ABSL_FLAG(uint32_t, max_squashed_cmd_num, 100, "Max number of commands squashed in a single shard during squash optimizaiton"); +ABSL_FLAG(dfly::ReplicaMode, replica_mode, dfly::ReplicaMode::kReadonly, + "Replica writability mode. 'readonly' (default) rejects client writes on a replica with " + "READONLY. 'mutable' accepts client writes — used together with REPLICAOF NO_FULL_SYNC " + "for active replication topologies (last-writer-wins, eventual consistency)."); + namespace dfly { using namespace std; using namespace std::chrono_literals; +bool AbslParseFlag(std::string_view in, ReplicaMode* mode, std::string* err) { + if (in == "readonly") { + *mode = ReplicaMode::kReadonly; + return true; + } + if (in == "mutable") { + *mode = ReplicaMode::kMutable; + return true; + } + *err = absl::StrCat("Unknown value '", in, + "' for replica_mode flag (expected 'readonly' or " + "'mutable')"); + return false; +} + +std::string AbslUnparseFlag(ReplicaMode mode) { + switch (mode) { + case ReplicaMode::kReadonly: + return "readonly"; + case ReplicaMode::kMutable: + return "mutable"; + } + DCHECK(false) << "Unknown replica_mode flag value " << int(mode); + return "readonly"; +} + __thread ServerState* ServerState::state_ = nullptr; facade::ConnectionStats* ServerState::tl_connection_stats() { @@ -354,4 +385,9 @@ void ServerState::RecordCmd(bool is_main_conn) { } qps_.Inc(); } + +bool IsReplicaMutable() { + return absl::GetFlag(FLAGS_replica_mode) == ReplicaMode::kMutable; +} + } // end of namespace dfly diff --git a/src/server/server_state.h b/src/server/server_state.h index 419c4b15d099..c7de94e3635b 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -332,4 +332,11 @@ class ServerState { // public struct - to allow initialization. static __thread ServerState* state_; }; +// Selects writability of a replica. `kMutable` lifts the -READONLY guard on +// client writes and re-enables the periodic expiration sweep on this replica. +enum class ReplicaMode : uint8_t { kReadonly, kMutable }; + +// Returns true when --replica_mode=mutable. +bool IsReplicaMutable(); + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 48121323192d..caaef95b897f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -510,6 +510,9 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { cid_ = cid; re_enabled_auto_journal_ = false; cb_ptr_.reset(); + // Cleared so a reused transaction doesn't leak the prior command's source replid; + // PrepareTransaction will set it again for the new dispatch. + journal_source_replid_.clear(); for (auto& sd : shard_data_) { sd.slice_count = sd.slice_start = 0; @@ -1581,7 +1584,8 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul void Transaction::LogJournalOnShard(journal::Entry::Payload&& payload) const { journal::RecordEntry(txid_, journal::Op::COMMAND, db_index_, - unique_slot_checker_.GetUniqueSlotId(), std::move(payload)); + unique_slot_checker_.GetUniqueSlotId(), std::move(payload), + journal_source_replid_); } void Transaction::ReviveAutoJournal() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 6944b9855452..874d4262a6c5 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -350,6 +350,15 @@ class Transaction { // Write a journal entry to a shard journal with the given payload. void LogJournalOnShard(journal::Entry::Payload&& payload) const; + // Tag journal entries emitted by this transaction with the given source replid. + // Used by replication apply path to enable loop suppression in active replication. + void SetJournalSourceReplid(std::string replid) { + journal_source_replid_ = std::move(replid); + } + std::string_view journal_source_replid() const { + return journal_source_replid_; + } + // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. void ReviveAutoJournal(); @@ -614,6 +623,10 @@ class Transaction { DbIndex db_index_{0}; uint64_t time_now_ms_{0}; + // Replid that journal records produced by this transaction will carry. See + // ConnectionContext::journal_source_replid for semantics. Empty for ordinary client writes. + std::string journal_source_replid_; + std::atomic_uint32_t use_count_{0}; // transaction exists only as an intrusive_ptr uint32_t unique_shard_cnt_{0}; // Number of unique shards active diff --git a/src/server/version.h b/src/server/version.h index 36da1266119c..5be940ded75f 100644 --- a/src/server/version.h +++ b/src/server/version.h @@ -42,8 +42,12 @@ enum class DflyVersion { // - hnsw-index-metadata AUX field VER6, + // - DFLY FLOW ... STABLE keyword: skip full sync entirely, start streaming from + // master's current LSN. Used by --replica_mode=mutable / REPLICAOF NO_FULL_SYNC. + VER7, + // Always points to the latest version - CURRENT_VER = VER6, + CURRENT_VER = VER7, }; } // namespace dfly diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 9b72753322e0..1a270ca7c64f 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2999,24 +2999,6 @@ async def check_master_status(): assert await seeder.compare(capture, port=master.port) -async def test_replica_of_replica(df_factory): - # Can't connect a replica to a replica, but OK to connect 2 replicas to the same master - master = df_factory.create(proactor_threads=2) - replica = df_factory.create(proactor_threads=2) - replica2 = df_factory.create(proactor_threads=2) - - df_factory.start_all([master, replica, replica2]) - - c_replica = replica.client() - c_replica2 = replica2.client() - - assert await c_replica.execute_command(f"REPLICAOF localhost {master.port}") == "OK" - - with pytest.raises(redis.exceptions.ResponseError): - await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}") - - assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK" - @pytest.mark.exclude_epoll @dfly_args({"proactor_threads": 1}) @@ -4715,3 +4697,354 @@ async def test_snapshot_load_replication(df_factory: DflyInstanceFactory): assert master_capture == replica_capture await c_replica.execute_command("REPLICAOF", "NO", "ONE") + + +# ----------------------------------------------------------------------------- +# Cascading replication (replica-of-replica) +# +# Master -> R1 -> R2 -> ... chains of arbitrary depth. Each downstream replica +# behaves as a normal read-only replica that observes the upstream master's +# writes through its parent in the chain. +# ----------------------------------------------------------------------------- + + +async def _wait_keys_propagated(client, expected_keys, timeout=30): + """Poll DBSIZE on a replica until it reaches expected_keys or times out.""" + start = time.time() + while time.time() - start < timeout: + size = await client.execute_command("DBSIZE") + if size >= expected_keys: + return size + await asyncio.sleep(0.1) + raise AssertionError(f"DBSIZE never reached {expected_keys}, last={size}") + + +async def test_chain_basic_3node(df_factory: DflyInstanceFactory): + """M -> R1 -> R2. Writes on M must reach R2; INFO must report both + directions for the chained R1 (role=replica AND connected_slaves).""" + master = df_factory.create(proactor_threads=2) + r1 = df_factory.create(proactor_threads=2) + r2 = df_factory.create(proactor_threads=2) + df_factory.start_all([master, r1, r2]) + + c_master, c_r1, c_r2 = master.client(), r1.client(), r2.client() + + await c_r1.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_r1) + await c_r2.execute_command(f"REPLICAOF localhost {r1.port}") + await wait_available_async(c_r2) + + n = 200 + for i in range(n): + await c_master.execute_command(f"SET k{i} v{i}") + + await _wait_keys_propagated(c_r1, n) + await _wait_keys_propagated(c_r2, n) + + # Sample-check value propagation end-to-end. + for i in (0, n // 2, n - 1): + assert (await c_r2.execute_command(f"GET k{i}")) == f"v{i}" + + info_master = await c_master.info("replication") + info_r1 = await c_r1.info("replication") + info_r2 = await c_r2.info("replication") + + assert info_master["role"] == "master" + assert info_master["connected_slaves"] == 1 + + # R1 is a replica of master AND has one downstream slave. + assert info_r1["role"] in ("replica", "slave") + assert info_r1["master_host"] == "localhost" + assert int(info_r1["master_port"]) == master.port + assert info_r1["connected_slaves"] == 1 + assert "slave0" in info_r1 + # redis-py auto-parses each slaveN entry into a dict. + assert int(info_r1["slave0"]["port"]) == r2.port + + assert info_r2["role"] in ("replica", "slave") + assert int(info_r2["master_port"]) == r1.port + + +async def test_chain_takeover_rejected(df_factory: DflyInstanceFactory): + """REPLTAKEOVER issued by R2 against R1 must be rejected because R1 is itself + a chained replica and we refuse to handle takeover in that role.""" + master = df_factory.create(proactor_threads=2) + r1 = df_factory.create(proactor_threads=2) + r2 = df_factory.create(proactor_threads=2) + df_factory.start_all([master, r1, r2]) + + c_master, c_r1, c_r2 = master.client(), r1.client(), r2.client() + await c_r1.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_r1) + await c_r2.execute_command(f"REPLICAOF localhost {r1.port}") + await wait_available_async(c_r2) + + with pytest.raises(redis.exceptions.ResponseError): + await c_r2.execute_command("REPLTAKEOVER 5") + + # Topology unchanged: R2 is still a replica of R1. + role_r2 = await c_r2.execute_command("ROLE") + assert role_r2[0] == "slave" + assert int(role_r2[2]) == r1.port + + role_r1 = await c_r1.execute_command("ROLE") + assert role_r1[0] == "slave" + assert int(role_r1[2]) == master.port + + +async def test_chain_replicaof_no_one_promotes_r1(df_factory: DflyInstanceFactory): + """`REPLICAOF NO ONE` on R1 promotes R1 to master. R2 should keep replicating + from now-master R1 without needing to be reconfigured (replid stays the + same across the role flip).""" + master = df_factory.create(proactor_threads=2) + r1 = df_factory.create(proactor_threads=2) + r2 = df_factory.create(proactor_threads=2) + df_factory.start_all([master, r1, r2]) + + c_master, c_r1, c_r2 = master.client(), r1.client(), r2.client() + await c_r1.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_r1) + await c_r2.execute_command(f"REPLICAOF localhost {r1.port}") + await wait_available_async(c_r2) + + await c_master.execute_command("SET pre_promote 1") + await _wait_keys_propagated(c_r2, 1) + + await c_r1.execute_command("REPLICAOF", "NO", "ONE") + + @assert_eventually + async def check_r1_master(): + info = await c_r1.info("replication") + assert info["role"] == "master" + assert info["connected_slaves"] >= 1 + + await check_r1_master() + + # Subsequent writes go directly to R1 (now master); R2 must observe them. + await c_r1.execute_command("SET post_promote 2") + + @assert_eventually + async def check_r2_post_promote(): + v = await c_r2.execute_command("GET post_promote") + assert v == "2" + + await check_r2_post_promote() + + +async def test_chain_failover_repoint_r2_takes_r1_place(df_factory: DflyInstanceFactory): + """Failover scenario: Master -> R1 -> R2. R1 dies. We re-point R2 directly + at Master so it becomes the new intermediate, then attach a fresh R3 behind + R2 to rebuild the chain as Master -> R2 -> R3. + + Verifies that the cascading code path supports a survivor taking over an + intermediate role through manual REPLICAOF reconfiguration.""" + master = df_factory.create(proactor_threads=2) + r1 = df_factory.create(proactor_threads=2) + r2 = df_factory.create(proactor_threads=2) + df_factory.start_all([master, r1, r2]) + + c_master, c_r1, c_r2 = master.client(), r1.client(), r2.client() + + # Phase 1: original chain Master -> R1 -> R2. + await c_r1.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_r1) + await c_r2.execute_command(f"REPLICAOF localhost {r1.port}") + await wait_available_async(c_r2) + + await c_master.execute_command("SET pre_failover 1") + await _wait_keys_propagated(c_r2, 1) + + # Phase 2: R1 dies. + r1.stop() + + # Phase 3: re-point R2 directly at Master. R2's old psync cookie carries R1's + # replid, so a full sync from Master is expected (and acceptable). + await c_r2.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_r2) + + # Phase 4: bring up R3 and attach it behind R2 to rebuild the chain. + r3 = df_factory.create(proactor_threads=2) + df_factory.start_all([r3]) + c_r3 = r3.client() + await c_r3.execute_command(f"REPLICAOF localhost {r2.port}") + await wait_available_async(c_r3) + + # Phase 5: a write on Master must propagate Master -> R2 -> R3. + await c_master.execute_command("SET post_failover 2") + + @assert_eventually + async def check_r3_caught_up(): + assert (await c_r3.execute_command("GET post_failover")) == "2" + # Pre-failover key carried through the topology change too. + assert (await c_r3.execute_command("GET pre_failover")) == "1" + + await check_r3_caught_up() + + # Topology sanity: Master has 1 direct replica (R2); R2 is replica of Master + # AND has 1 downstream slave (R3); R3 is replica of R2. + info_master = await c_master.info("replication") + info_r2 = await c_r2.info("replication") + info_r3 = await c_r3.info("replication") + + assert info_master["role"] == "master" + assert info_master["connected_slaves"] == 1 + + assert info_r2["role"] in ("replica", "slave") + assert int(info_r2["master_port"]) == master.port + assert info_r2["connected_slaves"] == 1 + assert int(info_r2["slave0"]["port"]) == r3.port + + assert info_r3["role"] in ("replica", "slave") + assert int(info_r3["master_port"]) == r2.port + + +# --- replica_mode flag (mutable replica) --- +# +# `--replica_mode=mutable` lifts three guards: +# 1) -READONLY rejection of client writes (main_service.cc) +# 2) lazy expiration on read path skip (db_slice.cc) +# 3) periodic background expiry/eviction (engine_shard.cc) +# +# In the default (`readonly`) mode all three guards remain in place. + + +async def test_replica_mode_default_rejects_writes(df_factory: DflyInstanceFactory): + """Regression: default replica must reject client writes with -READONLY.""" + master = df_factory.create(proactor_threads=2) + replica = df_factory.create(proactor_threads=2) + df_factory.start_all([master, replica]) + + c_master, c_replica = master.client(), replica.client() + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + with pytest.raises(redis.exceptions.ResponseError) as exc: + await c_replica.execute_command("SET local-key 1") + assert "READONLY" in str(exc.value).upper() or "read only" in str(exc.value).lower() + + +async def test_replica_mode_mutable_allows_client_writes(df_factory: DflyInstanceFactory): + """`--replica_mode=mutable` allows direct client writes on a replica.""" + master = df_factory.create(proactor_threads=2) + replica = df_factory.create(proactor_threads=2, replica_mode="mutable") + df_factory.start_all([master, replica]) + + c_master, c_replica = master.client(), replica.client() + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + # Direct client write on the replica must succeed. + assert await c_replica.execute_command("SET local-key v") == "OK" + assert await c_replica.get("local-key") == "v" + + # Master continues to drive replication and is unaffected by the local write. + await c_master.execute_command("SET m-key from-master") + await check_all_replicas_finished([c_replica], c_master) + assert await c_replica.get("m-key") == "from-master" + + +async def test_replica_mode_mutable_background_expiration(df_factory: DflyInstanceFactory): + """On a mutable replica the background sweep must delete expired keys. + + We probe `total_heartbeat_expired_keys` (incremented only by the periodic + sweep, not by the lazy read-path expiration) to prove the sweep ran. + """ + master = df_factory.create(proactor_threads=2) + # hz left at default so the periodic sweep fires every ~10ms. + replica = df_factory.create(proactor_threads=2, replica_mode="mutable") + df_factory.start_all([master, replica]) + + c_master, c_replica = master.client(), replica.client() + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + before = (await c_replica.info("stats")).get("total_heartbeat_expired_keys", 0) + + # Local write with a short TTL — never seen by the master. + await c_replica.execute_command("SET local-exp v PX 200") + + @assert_eventually(timeout=5) + async def sweep_retired_key(): + after = (await c_replica.info("stats")).get("total_heartbeat_expired_keys", 0) + assert after > before, ( + f"background sweep did not run on mutable replica: " + f"total_heartbeat_expired_keys before={before} after={after}" + ) + assert await c_replica.get("local-exp") is None + assert await c_replica.execute_command("TTL", "local-exp") == -2 + + await sweep_retired_key() + + +# ----------------------------------------------------------------------------- +# Active replication: --replica_mode=mutable + REPLICAOF ... NO_FULL_SYNC +# +# Two (or more) nodes mutually REPLICAOF each other, accept client writes on +# both sides, do NOT flush local data on REPLICAOF, and propagate new writes +# bidirectionally with replid-based loop suppression. +# ----------------------------------------------------------------------------- + + +async def test_active_2node_full(df_factory: DflyInstanceFactory): + """Comprehensive 2-node active-replication scenario covering every invariant + of --replica_mode=mutable + REPLICAOF NO_FULL_SYNC: + + 1. Pre-existing local data is preserved on both sides (no flush). + 2. NO_FULL_SYNC means no historical cross-propagation. + 3. New writes propagate in both directions. + 4. INFO replication on a mutable node reports slave_read_only=0 and + replica_mode=mutable. + 5. Loop suppression: after writes settle, slave_repl_offset stays flat + across an idle window (no infinite ping-pong). + """ + a = df_factory.create(proactor_threads=2, replica_mode="mutable") + b = df_factory.create(proactor_threads=2, replica_mode="mutable") + df_factory.start_all([a, b]) + + c_a, c_b = a.client(), b.client() + + # Seed both sides with distinct pre-existing data. + await c_a.execute_command("SET only_on_a 1") + await c_b.execute_command("SET only_on_b 2") + + # Mutual REPLICAOF NO_FULL_SYNC. + await c_a.execute_command(f"REPLICAOF localhost {b.port} NO_FULL_SYNC") + await wait_available_async(c_a) + await c_b.execute_command(f"REPLICAOF localhost {a.port} NO_FULL_SYNC") + await wait_available_async(c_b) + + # (1) seeds preserved; (2) no historical cross-propagation. + assert (await c_a.execute_command("GET only_on_a")) == "1" + assert (await c_b.execute_command("GET only_on_b")) == "2" + assert (await c_a.execute_command("GET only_on_b")) is None + assert (await c_b.execute_command("GET only_on_a")) is None + + # (3) new writes propagate in both directions. + await c_a.execute_command("SET k_from_a hello_a") + await c_b.execute_command("SET k_from_b hello_b") + + @assert_eventually + async def both_visible(): + assert (await c_b.execute_command("GET k_from_a")) == "hello_a" + assert (await c_a.execute_command("GET k_from_b")) == "hello_b" + + await both_visible() + + # (4) INFO observability. + info = await c_a.info("replication") + assert int(info["slave_read_only"]) == 0 + assert info.get("replica_mode") == "mutable" + + # (5) Loop suppression: offsets must stay flat once propagation has settled. + await asyncio.sleep(0.5) + offset_a = int((await c_a.info("replication"))["slave_repl_offset"]) + offset_b = int((await c_b.info("replication"))["slave_repl_offset"]) + await asyncio.sleep(2.0) + offset_a_after = int((await c_a.info("replication"))["slave_repl_offset"]) + offset_b_after = int((await c_b.info("replication"))["slave_repl_offset"]) + assert ( + offset_a_after == offset_a + ), f"A's offset grew {offset_a} → {offset_a_after} idle: replication is looping" + assert ( + offset_b_after == offset_b + ), f"B's offset grew {offset_b} → {offset_b_after} idle: replication is looping"