Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2025 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.config;

import java.util.Objects;

/** Config class for the isolated replica feature that decouples replicas from the primary. */
public class IsolatedReplicaConfig {
public static final String CONFIG_PREFIX = "isolatedReplicaConfig.";

private final boolean enabled;
private final int pollingIntervalSeconds;

/**
* Create instance from provided configuration reader.
*
* @param configReader config reader
* @return class instance
*/
public static IsolatedReplicaConfig fromConfig(YamlConfigReader configReader) {
Objects.requireNonNull(configReader);
boolean enabled = configReader.getBoolean(CONFIG_PREFIX + "enabled", false);
int pollingIntervalSeconds =
configReader.getInteger(CONFIG_PREFIX + "pollingIntervalSeconds", 120);
return new IsolatedReplicaConfig(enabled, pollingIntervalSeconds);
}

/**
* Constructor.
*
* @param enabled if isolated replica is enabled
* @param pollingIntervalSeconds interval in seconds to poll for new index versions, must be > 0
*/
public IsolatedReplicaConfig(boolean enabled, int pollingIntervalSeconds) {
this.enabled = enabled;
this.pollingIntervalSeconds = pollingIntervalSeconds;
if (pollingIntervalSeconds <= 0) {
throw new IllegalArgumentException(
String.format(
"Polling interval seconds must be positive, got: %d", pollingIntervalSeconds));
}
}

/**
* @return if isolated replica is enabled
*/
public boolean isEnabled() {
return enabled;
}

/**
* @return interval in seconds to poll for new index versions
*/
public int getPollingIntervalSeconds() {
return pollingIntervalSeconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class NrtsearchConfig {
private final boolean useKeepAliveForReplication;
private final DirectoryFactory.MMapGrouping mmapGrouping;
private final boolean requireIdField;
private final IsolatedReplicaConfig isolatedReplicaConfig;

@Inject
public NrtsearchConfig(InputStream yamlStream) {
Expand Down Expand Up @@ -194,6 +195,7 @@ public NrtsearchConfig(InputStream yamlStream) {
DirectoryFactory.MMapGrouping.SEGMENT);
useSeparateCommitExecutor = configReader.getBoolean("useSeparateCommitExecutor", false);
requireIdField = configReader.getBoolean("requireIdField", false);
isolatedReplicaConfig = IsolatedReplicaConfig.fromConfig(configReader);

List<String> indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides");
Map<String, IndexLiveSettings> liveSettingsMap = new HashMap<>();
Expand Down Expand Up @@ -395,6 +397,10 @@ public boolean getRequireIdField() {
return requireIdField;
}

public IsolatedReplicaConfig getIsolatedReplicaConfig() {
return isolatedReplicaConfig;
}

/**
* Substitute all sub strings of the form ${FOO} with the environment variable value env[FOO].
* Variable names may only contain letters, numbers, and underscores. If a variable is not present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.yelp.nrtsearch.server.grpc.ReplicationServerGrpc.ReplicationServerBlockingStub;
import com.yelp.nrtsearch.server.grpc.discovery.PrimaryFileNameResolverProvider;
import com.yelp.nrtsearch.server.nrt.SimpleCopyJob.FileChunkStreamingIterator;
import com.yelp.nrtsearch.server.nrt.jobs.SimpleCopyJob.FileChunkStreamingIterator;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.yelp.nrtsearch.server.handler;

import static com.yelp.nrtsearch.server.nrt.NrtUtils.readFilesMetaData;

import com.google.protobuf.InvalidProtocolBufferException;
import com.yelp.nrtsearch.server.grpc.CopyFiles;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
Expand All @@ -23,7 +25,6 @@
import com.yelp.nrtsearch.server.index.IndexStateManager;
import com.yelp.nrtsearch.server.index.ShardState;
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
import com.yelp.nrtsearch.server.nrt.NRTReplicaNode;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter;
import io.grpc.Status;
Expand Down Expand Up @@ -106,8 +107,7 @@ private void handle(

long primaryGen = copyFilesRequest.getPrimaryGen();
// these are the files that the remote (primary) wants us to copy
Map<String, FileMetaData> files =
NRTReplicaNode.readFilesMetaData(copyFilesRequest.getFilesMetadata());
Map<String, FileMetaData> files = readFilesMetaData(copyFilesRequest.getFilesMetadata());

AtomicBoolean finished = new AtomicBoolean();
CopyJob job;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/index/ShardState.java
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,8 @@ public synchronized void startReplica(
configuration.getNodeName(),
indexDir,
new ShardSearcherFactory(true, false),
configuration.getIsolatedReplicaConfig(),
nrtDataManager,
verbose ? System.out : new PrintStream(OutputStream.nullOutputStream()),
configuration.getFileCopyConfig().getAckedCopy(),
configuration.getDecInitialCommit(),
Expand Down
110 changes: 30 additions & 80 deletions src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@
package com.yelp.nrtsearch.server.nrt;

import com.google.common.annotations.VisibleForTesting;
import com.yelp.nrtsearch.server.grpc.FileMetadata;
import com.yelp.nrtsearch.server.grpc.FilesMetadata;
import com.yelp.nrtsearch.server.config.IsolatedReplicaConfig;
import com.yelp.nrtsearch.server.grpc.GetNodesResponse;
import com.yelp.nrtsearch.server.grpc.NodeInfo;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
import com.yelp.nrtsearch.server.nrt.jobs.CopyJobManager;
import com.yelp.nrtsearch.server.nrt.jobs.GrpcCopyJobManager;
import com.yelp.nrtsearch.server.nrt.jobs.RemoteCopyJobManager;
import com.yelp.nrtsearch.server.utils.HostPort;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.replicator.nrt.*;
Expand All @@ -46,6 +44,7 @@ public class NRTReplicaNode extends ReplicaNode {

private final ReplicationServerClient primaryAddress;
private final ReplicaDeleterManager replicaDeleterManager;
private final CopyJobManager copyJobManager;
private final String indexName;
private final String indexId;
private final String nodeName;
Expand All @@ -66,6 +65,8 @@ public NRTReplicaNode(
String nodeName,
Directory indexDir,
SearcherFactory searcherFactory,
IsolatedReplicaConfig isolatedReplicaConfig,
NrtDataManager nrtDataManager,
PrintStream printStream,
boolean ackedCopy,
boolean decInitialCommit,
Expand All @@ -82,6 +83,19 @@ public NRTReplicaNode(
this.hostPort = hostPort;
replicaDeleterManager = decInitialCommit ? new ReplicaDeleterManager(this) : null;
this.filterIncompatibleSegmentReaders = filterIncompatibleSegmentReaders;

if (isolatedReplicaConfig.isEnabled()) {
if (hasPrimaryConnection()) {
throw new IllegalArgumentException(
"Cannot have both primary connection and isolated replica enabled");
}
copyJobManager =
new RemoteCopyJobManager(
isolatedReplicaConfig.getPollingIntervalSeconds(), nrtDataManager, this);
} else {
copyJobManager =
new GrpcCopyJobManager(indexName, indexId, primaryAddress, ackedCopy, this, id);
}
// Handles fetching files from primary, on a new thread which receives files from primary
nrtCopyThread = getNrtCopyThread(this, lowPriorityCopyPercentage);
nrtCopyThread.setName("R" + id + ".copyJobs");
Expand All @@ -98,6 +112,11 @@ static NrtCopyThread getNrtCopyThread(Node node, int lowPriorityCopyPercentage)
}
}

@VisibleForTesting
CopyJobManager getCopyJobManager() {
return copyJobManager;
}

private long getLastPrimaryGen() throws IOException {
// detection logic from ReplicaNode
String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
Expand Down Expand Up @@ -146,6 +165,7 @@ public synchronized void start(long primaryGen) throws IOException {
mgr = new FilteringSegmentInfosSearcherManager(getDirectory(), this, mgr, searcherFactory);
oldMgr.close();
}
copyJobManager.start();
}

@Override
Expand All @@ -156,80 +176,7 @@ protected CopyJob newCopyJob(
boolean highPriority,
CopyJob.OnceDone onceDone)
throws IOException {
if (!hasPrimaryConnection()) {
throw new IllegalStateException(
"Cannot create new copy job, primary connection not available");
}
CopyState copyState;

// sendMeFiles(?) (we dont need this, just send Index,replica, and request for copy State)
if (files == null) {
// No incoming CopyState: ask primary for latest one now
try {
// Exceptions in here mean something went wrong talking over the socket, which are fine
// (e.g. primary node crashed):
copyState = getCopyStateFromPrimary();
} catch (Throwable t) {
throw new NodeCommunicationException("exc while reading files to copy", t);
}
files = copyState.files();
} else {
copyState = null;
}
return new SimpleCopyJob(
reason,
primaryAddress,
copyState,
this,
files,
highPriority,
onceDone,
indexName,
indexId,
ackedCopy);
}

private CopyState getCopyStateFromPrimary() throws IOException {
com.yelp.nrtsearch.server.grpc.CopyState copyState =
primaryAddress.recvCopyState(indexName, indexId, id);
return readCopyState(copyState);
}

/** Pulls CopyState off the wire */
private static CopyState readCopyState(com.yelp.nrtsearch.server.grpc.CopyState copyState)
throws IOException {

// Decode a new CopyState
byte[] infosBytes = new byte[copyState.getInfoBytesLength()];
copyState.getInfoBytes().copyTo(ByteBuffer.wrap(infosBytes));

long gen = copyState.getGen();
long version = copyState.getVersion();
Map<String, FileMetaData> files = readFilesMetaData(copyState.getFilesMetadata());

Set<String> completedMergeFiles = new HashSet<>(copyState.getCompletedMergeFilesList());
long primaryGen = copyState.getPrimaryGen();

return new CopyState(files, version, gen, infosBytes, completedMergeFiles, primaryGen, null);
}

public static Map<String, FileMetaData> readFilesMetaData(FilesMetadata filesMetadata)
throws IOException {
int fileCount = filesMetadata.getNumFiles();
assert fileCount == filesMetadata.getFileMetadataCount();

Map<String, FileMetaData> files = new HashMap<>();
for (FileMetadata fileMetadata : filesMetadata.getFileMetadataList()) {
String fileName = fileMetadata.getFileName();
long length = fileMetadata.getLen();
long checksum = fileMetadata.getChecksum();
byte[] header = new byte[fileMetadata.getHeaderLength()];
fileMetadata.getHeader().copyTo(ByteBuffer.wrap(header));
byte[] footer = new byte[fileMetadata.getFooterLength()];
fileMetadata.getFooter().copyTo(ByteBuffer.wrap(footer));
files.put(fileName, new FileMetaData(header, footer, length, checksum));
}
return files;
return copyJobManager.newCopyJob(reason, files, prevFiles, highPriority, onceDone);
}

@Override
Expand Down Expand Up @@ -260,6 +207,8 @@ public CopyJob launchPreCopyFiles(
protected void finishNRTCopy(CopyJob job, long startNS) throws IOException {
super.finishNRTCopy(job, startNS);

copyJobManager.finishNRTCopy(job);

// record metrics for this nrt point
if (job.getFailed()) {
NrtMetrics.nrtPointFailure.labelValues(indexName).inc();
Expand All @@ -276,6 +225,7 @@ protected void finishNRTCopy(CopyJob job, long startNS) throws IOException {
public void close() throws IOException {
nrtCopyThread.close();
logger.info("CLOSE NRT REPLICA");
copyJobManager.close();
message("top: jobs closed");
synchronized (mergeCopyJobs) {
for (CopyJob job : mergeCopyJobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.yelp.nrtsearch.server.nrt;

import com.yelp.nrtsearch.server.nrt.jobs.SimpleCopyJob;
import com.yelp.nrtsearch.server.nrt.jobs.VisitableCopyJob;
import java.io.Closeable;
import java.util.Locale;
import org.apache.lucene.replicator.nrt.CopyJob;
Expand Down Expand Up @@ -61,7 +63,7 @@ public NrtCopyThread(Node node) {
* Returns null if we are closing, else, returns the top job or waits for one to arrive if the
* queue is empty.
*/
private synchronized SimpleCopyJob getNextJob() {
private synchronized VisitableCopyJob getNextJob() {
while (true) {
if (finish) {
return null;
Expand All @@ -72,7 +74,7 @@ private synchronized SimpleCopyJob getNextJob() {
throw new RuntimeException(ie);
}
} else {
return (SimpleCopyJob) getJob();
return (VisitableCopyJob) getJob();
}
}
}
Expand All @@ -81,7 +83,7 @@ private synchronized SimpleCopyJob getNextJob() {
public void run() {
// nocommit: prioritize jobs better here, the way an OS assigns CPU to processes:
while (true) {
SimpleCopyJob topJob = getNextJob();
VisitableCopyJob topJob = getNextJob();
if (topJob == null) {
assert finish;
break;
Expand Down
Loading
Loading