Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ class ConnectionContext : public facade::ConnectionContext {
// of it as a state for the connection
bool journal_emulated = false; // whether it is used to dispatch journal commands

// Replid to tag journal records produced by commands dispatched on this connection.
// Empty for ordinary client connections (records are local and shipped to all peers).
// Set by JournalExecutor to the upstream master's replid during replication apply, so that
// a downstream JournalStreamer targeting that same upstream skips them (loop suppression
// for active replication / --replica_mode=mutable).
std::string journal_source_replid;

// Reference to a master-side FlowInfo for this connection if it is a replication connection.
FlowInfo* master_repl_flow = nullptr;

Expand Down
8 changes: 5 additions & 3 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1282,10 +1282,12 @@ PrimeIterator DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) con

int64_t expire_time = it->first.GetExpireTime();

// Never do expiration if expiration is disabled, or on replicas unless replica_delete_expired
// is enabled (which allows replicas to proactively delete expired keys on the read path).
// Never do expiration if expiration is disabled, or on replicas unless either
// replica_delete_expired is enabled (proactive read-path deletion) or the replica is in
// --replica_mode=mutable (acts as its own master for expiration purposes).
if (int64_t(cntx.time_now_ms) < expire_time || !expire_allowed_ ||
(owner_->IsReplica() && !absl::GetFlag(FLAGS_replica_delete_expired))) {
(owner_->IsReplica() && !absl::GetFlag(FLAGS_replica_delete_expired) &&
!IsReplicaMutable())) {
return it;
}

Expand Down
41 changes: 33 additions & 8 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const char kBadMasterId[] = "bad master id";
const char kIdNotFound[] = "syncid not found";
const char kInvalidSyncId[] = "bad sync id";
const char kInvalidState[] = "invalid state";
const char kTakeoverOnReplica[] = "TAKEOVER not supported on a chained replica";

bool ToSyncId(string_view str, uint32_t* num) {
if (!absl::StartsWith(str, "SYNC"))
Expand Down Expand Up @@ -247,21 +248,31 @@ void DflyCmd::Flow(CmdArgList args, CommandContext* cmd_cntx) {
string_view sync_id_str = ArgS(args, 2);
string_view flow_id_str = ArgS(args, 3);

// Optional trailing STABLE keyword: skip full sync and start streaming from current LSN.
// Used by replicas in --replica_mode=mutable / REPLICAOF NO_FULL_SYNC.
size_t pos_args_end = args.size();
bool stable_only = false;
if (pos_args_end > 4 && absl::EqualsIgnoreCase(ArgS(args, pos_args_end - 1), "STABLE")) {
stable_only = true;
--pos_args_end;
}

std::optional<LSN> seqid;
std::optional<string> last_master_id;
std::optional<string> last_master_lsn;
if (args.size() == 5) {
if (pos_args_end == 5) {
seqid.emplace();
if (!absl::SimpleAtoi(ArgS(args, 4), &seqid.value())) {
return cmd_cntx->SendError(facade::kInvalidIntErr);
}
} else if (args.size() == 6) {
} else if (pos_args_end == 6) {
last_master_id = ArgS(args, 4);
last_master_lsn = ArgS(args, 5);
}

VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str
<< " flow: " << flow_id_str << " seq: " << seqid.value_or(-1);
<< " flow: " << flow_id_str << " seq: " << seqid.value_or(-1)
<< " stable_only: " << stable_only;

if (master_id != sf_->master_replid()) {
return cmd_cntx->SendError(kBadMasterId);
Expand Down Expand Up @@ -300,6 +311,7 @@ void DflyCmd::Flow(CmdArgList args, CommandContext* cmd_cntx) {
flow.conn = cmd_cntx->conn();
flow.eof_token = eof_token;
flow.version = replica_ptr->version;
flow.target_replid = replica_ptr->id;

if (!conn_cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
// Listener::PreShutdown() triggered
Expand All @@ -313,8 +325,12 @@ void DflyCmd::Flow(CmdArgList args, CommandContext* cmd_cntx) {

std::optional<Replica::LastMasterSyncData> data = sf_->GetLastMasterData();
std::optional<LSN> lsn_to_start_partial;
// In this flow the master and the registered replica where synced from the same master.
if (last_master_id && data && data->id == *last_master_id) {
if (stable_only) {
// Active replication: start partial-sync from this shard's current LSN.
// Replica does not get history; existing local data is preserved.
lsn_to_start_partial.emplace(journal::GetLsn());
} else if (last_master_id && data && data->id == *last_master_id) {
// In this flow the master and the registered replica where synced from the same master.
++ServerState::tlocal()->stats.psync_requests_total;
auto flow_lsn =
ParseLsnVec(*last_master_lsn, data->last_journal_LSNs.size(), flow_id, cmd_cntx);
Expand Down Expand Up @@ -487,6 +503,10 @@ std::optional<LSN> DflyCmd::ParseLsnVec(std::string_view last_master_lsn,
// timeout_sec - number of seconds to wait for TAKEOVER to converge.
// SAVE option is used only by tests.
void DflyCmd::TakeOver(CmdArgList args, CommandContext* cmd_cntx) {
if (!ServerState::tlocal()->is_master) {
return cmd_cntx->SendError(kTakeoverOnReplica);
}

CmdArgParser parser{args};
parser.Next();
float timeout = std::ceil(parser.Next<float>());
Expand Down Expand Up @@ -747,9 +767,14 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
DCHECK(flow->conn);

LSN partial_lsn = flow->start_partial_sync_at.value_or(0);
JournalStreamer::Config config{
.should_sent_lsn = true, .init_from_stable_sync = true, .start_partial_sync_at = partial_lsn};
flow->streamer.reset(new JournalStreamer(exec_st, config));
// For active replication / cascading: streamer skips records whose source_replid
// matches the connecting replica's own replid (sent via REPLCONF CLIENT-ID and
// stored in replica_ptr->id). Empty disables filtering.
JournalStreamer::Config config{.should_sent_lsn = true,
.init_from_stable_sync = true,
.start_partial_sync_at = partial_lsn,
.target_replid = flow->target_replid};
flow->streamer.reset(new JournalStreamer(exec_st, std::move(config)));
flow->streamer->Start(flow->conn->socket());

// Register cleanup.
Expand Down
5 changes: 5 additions & 0 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ struct FlowInfo {
std::optional<LSN> start_partial_sync_at;
uint64_t last_acked_lsn = 0;

// Replid of the connected replica (its own master_replid), copied from
// ReplicaInfo::id. Used by JournalStreamer to skip records that originated
// from this replica — loop suppression for active replication.
std::string target_replid;

std::function<void()> cleanup; // Optional cleanup for cancellation.
};

Expand Down
8 changes: 6 additions & 2 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,8 @@ void EngineShard::Heartbeat() {
}
}

if (!IsReplica()) { // Never run expiry/evictions on replica.
// Replicas skip the periodic sweep by default; --replica_mode=mutable opts in.
if (!IsReplica() || IsReplicaMutable()) {
RetireExpiredAndEvict();
}

Expand Down Expand Up @@ -847,7 +848,10 @@ void EngineShard::RetireExpiredAndEvict() {
db_cntx.time_now_ms = GetCurrentTimeMs();

size_t deleted_bytes = 0;
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
// Cache-mode eviction is master-only: FreeMemWithEvictionStepAtomic asserts !IsReplica().
// A mutable replica still runs the TTL sweep above but stays out of memory-pressure eviction.
size_t eviction_goal =
(!IsReplica() && GetFlag(FLAGS_enable_heartbeat_eviction)) ? CalculateEvictionBytes() : 0;

for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
Expand Down
6 changes: 6 additions & 0 deletions src/server/journal/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ class JournalExecutor {
return &conn_context_;
}

// Tag every journal record produced by commands dispatched through this executor with
// the given replid (typically the upstream master's replid). Empty disables tagging.
void SetSourceReplid(std::string replid) {
conn_context_.journal_source_replid = std::move(replid);
}

private:
facade::DispatchResult Execute(CommandContext* cmd_cntx);

Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ LSN GetLsn() {
}

void RecordEntry(TxId txid, Op opcode, DbIndex dbid, std::optional<SlotId> slot,
Entry::Payload payload) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, slot, std::move(payload)});
Entry::Payload payload, std::string_view source_replid) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, slot, std::move(payload)}, source_replid);
}

void SetFlushMode(bool allow_flush) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ uint32_t RegisterConsumer(JournalConsumerInterface* consumer);
void UnregisterConsumer(uint32_t id);

void RecordEntry(TxId txid, Op opcode, DbIndex dbid, std::optional<SlotId> slot,
Entry::Payload payload);
Entry::Payload payload, std::string_view source_replid = {});

size_t LsnBufferSize();
size_t LsnBufferBytes();
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void JournalSlice::SetFlushMode(bool allow_flush) {
}
}

void JournalSlice::AddLogRecord(const Entry& entry) {
void JournalSlice::AddLogRecord(const Entry& entry, std::string_view source_replid) {
DCHECK(ring_buffer_.capacity() > 0);

JournalChangeItem item;
Expand All @@ -88,6 +88,7 @@ void JournalSlice::AddLogRecord(const Entry& entry) {
// only used by RestoreStreamer
item.cmd = entry.payload.cmd;
item.slot = entry.slot;
item.source_replid = source_replid;

io::StringSink sink;
JournalWriter writer{&sink};
Expand Down
5 changes: 4 additions & 1 deletion src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class JournalSlice {
return status_ec_;
}

void AddLogRecord(const Entry& entry);
// source_replid: replid of the upstream peer this entry was applied from.
// Empty for locally originated writes. Forwarded to JournalChangeItem so
// streamers can skip records that originated from their target peer.
void AddLogRecord(const Entry& entry, std::string_view source_replid = {});

// Register a callback that will be called every time a new entry is
// added to the journal.
Expand Down
13 changes: 13 additions & 0 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ void JournalStreamer::ConsumeJournalChange(const JournalChangeItem& item) {
return;
}

if (IsLoopSuppressed(item)) {
// Active-replication loop suppression: don't ship the record (it would loop
// back to the peer it came from), but still ship a PING so the receiver's
// record counter advances past this LSN. Otherwise the LSN integrity check
// (tx_executor.cc) would mismatch and partial-sync resume would be off.
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(journal::Entry{journal::Op::PING, /*dbid=*/0, /*slot_id=*/std::nullopt});
Write(std::move(sink).str());
last_lsn_writen_ = item.journal_item.lsn;
return;
}

DCHECK_GT(item.journal_item.lsn, last_lsn_writen_);
Write(item.journal_item.data);
time_t now = time(nullptr);
Expand Down
12 changes: 12 additions & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class JournalStreamer : public journal::JournalConsumerInterface {
bool should_sent_lsn = false;
bool init_from_stable_sync = false;
LSN start_partial_sync_at = 0;
// For active replication: skip records whose source_replid matches this
// (i.e., records that originated from the peer this streamer is sending to).
// Empty disables filtering. Typically set to the connecting replica's own
// master_replid as conveyed via REPLCONF CLIENT-ID.
std::string target_replid;
};

JournalStreamer(ExecutionState* cntx, Config config);
Expand Down Expand Up @@ -63,6 +68,13 @@ class JournalStreamer : public journal::JournalConsumerInterface {
return cntx_->IsRunning();
}

// Active replication: a record is loop-suppressed if its source is the very peer
// we're sending to. The streamer ships a small PING placeholder for these records
// so the replica's LSN counter stays aligned with our journal LSN axis.
bool IsLoopSuppressed(const journal::JournalChangeItem& item) const {
return !config_.target_replid.empty() && item.source_replid == config_.target_replid;
}

void WaitForInflightToComplete(bool with_timeout);

size_t inflight_bytes() const {
Expand Down
6 changes: 6 additions & 0 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ struct JournalChangeItem {

std::string_view cmd;
std::optional<SlotId> slot;
// Replid of the upstream source this record was applied from. Empty for locally
// originated writes. A JournalStreamer skips records whose source_replid equals
// its target replica's replid (active-replication loop suppression).
// Stored as std::string (not string_view) because the originating Transaction may
// be destroyed before async streamer fibers iterate over this item.
std::string source_replid;
};

struct JournalConsumerInterface {
Expand Down
12 changes: 11 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,15 @@ pair<intrusive_ptr<Transaction>, OpStatus> PrepareTransaction(const CommandId* c

cmd_ctx->SetupTx(cid, dfly_cntx->transaction);

// Carry the source replid from the dispatching connection so that journal records
// emitted by this transaction can be filtered out on streamers targeting the source
// (active replication / --replica_mode=mutable). Always set — including the empty
// case — to clear any value left over from a previous command on a reused
// (MULTI/EXEC) transaction.
if (dfly_cntx->transaction != nullptr) {
dfly_cntx->transaction->SetJournalSourceReplid(dfly_cntx->journal_source_replid);
}

if (init) {
DCHECK(cmd_ctx->tx());
if (auto st =
Expand Down Expand Up @@ -966,6 +975,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("timeout");
config_registry.RegisterMutable("send_timeout");
config_registry.RegisterMutable("managed_service_info");
config_registry.RegisterMutable("replica_mode");
#ifdef WITH_SEARCH
config_registry.RegisterMutable("MAXSEARCHRESULTS");
config_registry.RegisterMutable("search_query_string_bytes");
Expand Down Expand Up @@ -1315,7 +1325,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId& cid, CmdA
bool under_script = dfly_cntx.conn_state.script_info != nullptr;
bool multi_active = dfly_cntx.conn_state.exec_info.IsCollecting() && !is_trans_cmd;

if (!etl.is_master && is_write_cmd && !dfly_cntx.is_replicating)
if (!etl.is_master && is_write_cmd && !dfly_cntx.is_replicating && !IsReplicaMutable())
return ErrorReply{"-READONLY You can't write against a read only replica."};

if (multi_active) {
Expand Down
25 changes: 21 additions & 4 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern "C" {
#include "server/main_service.h"
#include "server/namespaces.h"
#include "server/rdb_load.h"
#include "server/server_state.h"
#include "strings/human_readable.h"

#define LOG_REPL_ERROR(msg) \
Expand Down Expand Up @@ -94,8 +95,11 @@ vector<vector<unsigned>> Partition(unsigned num_flows) {
} // namespace

Replica::Replica(string host, uint16_t port, Service* se, std::string_view id,
std::optional<cluster::SlotRange> slot_range)
std::optional<cluster::SlotRange> slot_range, bool no_full_sync)
: ProtocolClient(std::move(host), port), service_(*se), id_{id}, slot_range_(slot_range) {
// no_full_sync is per-replica config; carry it on master_context_ so it survives
// the per-reconnect re-init in HandleCapaDflyResp.
master_context_.no_full_sync = no_full_sync;
proactor_ = ProactorBase::me();
}

Expand Down Expand Up @@ -410,7 +414,8 @@ std::error_code Replica::HandleCapaDflyResp() {
VLOG(1) << "Master id: " << master_context_.master_repl_id
<< ", sync id: " << master_context_.dfly_session_id
<< ", num journals: " << param_num_flows
<< ", version: " << unsigned(master_context_.version);
<< ", version: " << unsigned(master_context_.version)
<< ", no_full_sync: " << master_context_.no_full_sync;

return error_code{};
}
Expand Down Expand Up @@ -876,7 +881,7 @@ io::Result<bool> DflyShardReplica::StartSyncFlow(
VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " "
<< master_context_.dfly_session_id << " " << flow_id_ << " lsn: " << lsn.value_or(-1);

// DFLY FLOW <master_id> <session_id> <flow_id> [lsn] [last_master_id lsn-vec]
// DFLY FLOW <master_id> <session_id> <flow_id> [lsn] [last_master_id lsn-vec] [STABLE]
std::string cmd = StrCat("DFLY FLOW ", master_context_.master_repl_id, " ",
master_context_.dfly_session_id, " ", flow_id_);
// Try to negotiate a partial sync if possible.
Expand All @@ -890,6 +895,11 @@ io::Result<bool> DflyShardReplica::StartSyncFlow(
absl::StrAppend(&cmd, " ", last_master_data.value().id, " ", lsn_str);
VLOG(1) << "Sending last master sync flow " << last_master_data.value().id << " " << lsn_str;
}
// Active replication: ask master to skip historical full-sync and start streaming
// from its current LSN. Existing local data on the replica is preserved.
if (master_context_.no_full_sync && master_context_.version >= DflyVersion::VER7) {
absl::StrAppend(&cmd, " STABLE");
}

ResetParser(RedisParser::Mode::CLIENT);
leftover_buf_.emplace(128);
Expand Down Expand Up @@ -1027,7 +1037,11 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) {
} else if (tx_data.opcode == journal::Op::PING) {
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
if (EngineShard::tlocal() && EngineShard::tlocal()->journal()) {
// Don't re-journal PINGs received from upstream when this node accepts client
// writes (active replication). Doing so would relay the master's loop-suppression
// PING placeholders back across the chain, creating an infinite ping-pong.
const bool active_mode = IsReplicaMutable();
if (!active_mode && EngineShard::tlocal() && EngineShard::tlocal()->journal()) {
// We must register this entry to the journal to allow partial sync
// if journal is active.
journal::RecordEntry(0, journal::Op::PING, 0, nullopt, {});
Expand Down Expand Up @@ -1124,6 +1138,9 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
multi_shard_exe_(multi_shard_exe),
flow_id_(flow_id) {
executor_ = std::make_unique<JournalExecutor>(service);
// Tag records dispatched through this executor with the upstream's replid so that
// a downstream JournalStreamer pointing back at the same upstream skips them.
executor_->SetSourceReplid(master_context.master_repl_id);
rdb_loader_ = std::make_unique<RdbLoader>(&service_, load_context);
rdb_loader_->SetLoadUnownedSlots(true);
rdb_loader_->SetShardCount(master_context.num_flows);
Expand Down
Loading
Loading