Skip to content

Commit cae754d

Browse files
varunbharadwajmch2
andauthored
[Pull-based Ingestion] Support all-active mode in pull-based ingestion (#19316)
Support docrep equivalent in pull-based ingestion --------- Signed-off-by: Varun Bharadwaj <[email protected]> Co-authored-by: Marc Handalian <[email protected]>
1 parent 4f987eb commit cae754d

File tree

21 files changed

+707
-78
lines changed

21 files changed

+707
-78
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- Upgrade opensearch-protobufs dependency to 0.13.0 and update transport-grpc module compatibility ([#19007](https://github.com/opensearch-project/OpenSearch/issues/19007))
2525
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
2626
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
27+
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
2728

2829
### Changed
2930
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))

plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void testFileIngestion() throws Exception {
110110
assertEquals(0, ingestionState.getFailedShards());
111111
assertTrue(
112112
Arrays.stream(ingestionState.getShardStates())
113-
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"))
113+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"))
114114
);
115115
});
116116

@@ -129,7 +129,7 @@ public void testFileIngestion() throws Exception {
129129
Arrays.stream(ingestionState.getShardStates())
130130
.allMatch(
131131
state -> state.isPollerPaused() == false
132-
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
132+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
133133
)
134134
);
135135
});

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,38 @@
1212
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1313
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1414
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
15+
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
16+
import org.opensearch.action.admin.indices.stats.IndexStats;
17+
import org.opensearch.action.admin.indices.stats.ShardStats;
18+
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
19+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
20+
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1521
import org.opensearch.action.search.SearchResponse;
22+
import org.opensearch.cluster.ClusterState;
1623
import org.opensearch.cluster.metadata.IndexMetadata;
24+
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
25+
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
1726
import org.opensearch.common.settings.Settings;
1827
import org.opensearch.index.query.BoolQueryBuilder;
1928
import org.opensearch.index.query.RangeQueryBuilder;
2029
import org.opensearch.index.query.TermQueryBuilder;
2130
import org.opensearch.indices.pollingingest.PollingIngestStats;
2231
import org.opensearch.plugins.PluginInfo;
32+
import org.opensearch.test.InternalTestCluster;
2333
import org.opensearch.test.OpenSearchIntegTestCase;
2434
import org.opensearch.transport.client.Requests;
2535
import org.junit.Assert;
2636

37+
import java.util.Arrays;
38+
import java.util.HashMap;
2739
import java.util.List;
40+
import java.util.Map;
2841
import java.util.concurrent.TimeUnit;
2942
import java.util.function.Function;
3043
import java.util.stream.Collectors;
3144
import java.util.stream.Stream;
3245

46+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3347
import static org.hamcrest.Matchers.is;
3448
import static org.awaitility.Awaitility.await;
3549

@@ -236,4 +250,273 @@ public void testMultiThreadedWrites() throws Exception {
236250
return response.getHits().getTotalHits().value() == 1000;
237251
});
238252
}
253+
254+
public void testAllActiveIngestion() throws Exception {
255+
// Create pull-based index in default replication mode (docrep) and publish some messages
256+
257+
internalCluster().startClusterManagerOnlyNode();
258+
final String nodeA = internalCluster().startDataOnlyNode();
259+
for (int i = 0; i < 10; i++) {
260+
produceData(Integer.toString(i), "name" + i, "30");
261+
}
262+
263+
createIndex(
264+
indexName,
265+
Settings.builder()
266+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
267+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
268+
.put("ingestion_source.type", "kafka")
269+
.put("ingestion_source.param.topic", topicName)
270+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
271+
.put("ingestion_source.pointer.init.reset", "earliest")
272+
.put("ingestion_source.all_active", true)
273+
.build(),
274+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
275+
);
276+
277+
ensureYellowAndNoInitializingShards(indexName);
278+
waitForSearchableDocs(10, List.of(nodeA));
279+
flush(indexName);
280+
281+
// add a second node and verify the replica ingests the data
282+
final String nodeB = internalCluster().startDataOnlyNode();
283+
ensureGreen(indexName);
284+
assertTrue(nodeA.equals(primaryNodeName(indexName)));
285+
assertTrue(nodeB.equals(replicaNodeName(indexName)));
286+
waitForSearchableDocs(10, List.of(nodeB));
287+
288+
// verify pause and resume functionality on replica
289+
290+
// pause ingestion
291+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
292+
assertTrue(pauseResponse.isAcknowledged());
293+
assertTrue(pauseResponse.isShardsAcknowledged());
294+
waitForState(() -> {
295+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
296+
return ingestionState.getShardStates().length == 2
297+
&& ingestionState.getFailedShards() == 0
298+
&& Arrays.stream(ingestionState.getShardStates())
299+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
300+
});
301+
302+
for (int i = 10; i < 20; i++) {
303+
produceData(Integer.toString(i), "name" + i, "30");
304+
}
305+
306+
// replica must not ingest when paused
307+
Thread.sleep(1000);
308+
assertEquals(10, getSearchableDocCount(nodeB));
309+
310+
// resume ingestion
311+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
312+
assertTrue(resumeResponse.isAcknowledged());
313+
assertTrue(resumeResponse.isShardsAcknowledged());
314+
waitForState(() -> {
315+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
316+
return ingestionState.getShardStates().length == 2
317+
&& Arrays.stream(ingestionState.getShardStates())
318+
.allMatch(
319+
state -> state.isPollerPaused() == false
320+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
321+
);
322+
});
323+
324+
// verify replica ingests data after resuming ingestion
325+
waitForSearchableDocs(20, List.of(nodeA, nodeB));
326+
327+
// produce 10 more messages
328+
for (int i = 20; i < 30; i++) {
329+
produceData(Integer.toString(i), "name" + i, "30");
330+
}
331+
332+
// Add new node and wait for new node to join cluster
333+
final String nodeC = internalCluster().startDataOnlyNode();
334+
assertBusy(() -> {
335+
assertEquals(
336+
"Should have 4 nodes total (1 cluster manager + 3 data)",
337+
4,
338+
internalCluster().clusterService().state().nodes().getSize()
339+
);
340+
}, 30, TimeUnit.SECONDS);
341+
342+
// move replica from nodeB to nodeC
343+
ensureGreen(indexName);
344+
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(indexName, 0, nodeB, nodeC)).get();
345+
ensureGreen(indexName);
346+
347+
// confirm replica ingests messages after moving to new node
348+
waitForSearchableDocs(30, List.of(nodeA, nodeC));
349+
350+
for (int i = 30; i < 40; i++) {
351+
produceData(Integer.toString(i), "name" + i, "30");
352+
}
353+
354+
// restart replica node and verify ingestion
355+
internalCluster().restartNode(nodeC);
356+
ensureGreen(indexName);
357+
waitForSearchableDocs(40, List.of(nodeA, nodeC));
358+
359+
// Verify both primary and replica do not have failed messages
360+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
361+
assertNotNull(shardTypeToStats.get("primary"));
362+
assertNotNull(shardTypeToStats.get("replica"));
363+
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
364+
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
365+
// replica consumes only 10 messages after it has been restarted
366+
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
367+
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
368+
369+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
370+
assertEquals(2, ingestionState.getShardStates().length);
371+
assertEquals(0, ingestionState.getFailedShards());
372+
}
373+
374+
public void testReplicaPromotionOnAllActiveIngestion() throws Exception {
375+
// Create pull-based index in default replication mode (docrep) and publish some messages
376+
internalCluster().startClusterManagerOnlyNode();
377+
final String nodeA = internalCluster().startDataOnlyNode();
378+
for (int i = 0; i < 10; i++) {
379+
produceData(Integer.toString(i), "name" + i, "30");
380+
}
381+
382+
createIndex(
383+
indexName,
384+
Settings.builder()
385+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
386+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
387+
.put("ingestion_source.type", "kafka")
388+
.put("ingestion_source.param.topic", topicName)
389+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
390+
.put("ingestion_source.pointer.init.reset", "earliest")
391+
.put("ingestion_source.all_active", true)
392+
.build(),
393+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
394+
);
395+
396+
ensureYellowAndNoInitializingShards(indexName);
397+
waitForSearchableDocs(10, List.of(nodeA));
398+
399+
// add second node
400+
final String nodeB = internalCluster().startDataOnlyNode();
401+
ensureGreen(indexName);
402+
assertTrue(nodeA.equals(primaryNodeName(indexName)));
403+
assertTrue(nodeB.equals(replicaNodeName(indexName)));
404+
waitForSearchableDocs(10, List.of(nodeB));
405+
406+
// Validate replica promotion
407+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
408+
ensureYellowAndNoInitializingShards(indexName);
409+
assertTrue(nodeB.equals(primaryNodeName(indexName)));
410+
for (int i = 10; i < 20; i++) {
411+
produceData(Integer.toString(i), "name" + i, "30");
412+
}
413+
414+
waitForSearchableDocs(20, List.of(nodeB));
415+
416+
// add third node and allocate the replica once the node joins the cluster
417+
final String nodeC = internalCluster().startDataOnlyNode();
418+
assertBusy(() -> { assertEquals(3, internalCluster().clusterService().state().nodes().getSize()); }, 30, TimeUnit.SECONDS);
419+
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
420+
ensureGreen(indexName);
421+
waitForSearchableDocs(20, List.of(nodeC));
422+
423+
}
424+
425+
public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
426+
// Create pull-based index in default replication mode (docrep) and publish some messages
427+
internalCluster().startClusterManagerOnlyNode();
428+
final String nodeA = internalCluster().startDataOnlyNode();
429+
final String nodeB = internalCluster().startDataOnlyNode();
430+
for (int i = 0; i < 20; i++) {
431+
produceData(Integer.toString(i), "name" + i, "30");
432+
}
433+
434+
createIndex(
435+
indexName,
436+
Settings.builder()
437+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
438+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
439+
.put("ingestion_source.type", "kafka")
440+
.put("ingestion_source.param.topic", topicName)
441+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
442+
.put("ingestion_source.pointer.init.reset", "earliest")
443+
.put("ingestion_source.all_active", true)
444+
.build(),
445+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
446+
);
447+
ensureGreen(indexName);
448+
waitForSearchableDocs(20, List.of(nodeA, nodeB));
449+
450+
// Register snapshot repository
451+
String snapshotRepositoryName = "test-snapshot-repo";
452+
String snapshotName = "snapshot-1";
453+
assertAcked(
454+
client().admin()
455+
.cluster()
456+
.preparePutRepository(snapshotRepositoryName)
457+
.setType("fs")
458+
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", false))
459+
);
460+
461+
// Take snapshot
462+
flush(indexName);
463+
CreateSnapshotResponse snapshotResponse = client().admin()
464+
.cluster()
465+
.prepareCreateSnapshot(snapshotRepositoryName, snapshotName)
466+
.setWaitForCompletion(true)
467+
.setIndices(indexName)
468+
.get();
469+
assertTrue(snapshotResponse.getSnapshotInfo().successfulShards() > 0);
470+
471+
// Delete Index
472+
assertAcked(client().admin().indices().prepareDelete(indexName));
473+
waitForState(() -> {
474+
ClusterState state = client().admin().cluster().prepareState().setIndices(indexName).get().getState();
475+
return state.getRoutingTable().hasIndex(indexName) == false && state.getMetadata().hasIndex(indexName) == false;
476+
});
477+
478+
for (int i = 20; i < 40; i++) {
479+
produceData(Integer.toString(i), "name" + i, "30");
480+
}
481+
482+
// Restore Index from Snapshot
483+
client().admin()
484+
.cluster()
485+
.prepareRestoreSnapshot(snapshotRepositoryName, snapshotName)
486+
.setWaitForCompletion(true)
487+
.setIndices(indexName)
488+
.get();
489+
ensureGreen(indexName);
490+
491+
refresh(indexName);
492+
waitForSearchableDocs(40, List.of(nodeA, nodeB));
493+
494+
// Verify both primary and replica have polled only remaining 20 messages
495+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
496+
assertNotNull(shardTypeToStats.get("primary"));
497+
assertNotNull(shardTypeToStats.get("replica"));
498+
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPolledCount(), is(20L));
499+
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
500+
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
501+
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPolledCount(), is(20L));
502+
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
503+
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
504+
}
505+
506+
// returns PollingIngestStats for single primary and single replica
507+
private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplica(String indexName) {
508+
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
509+
ShardStats[] shards = indexStats.getShards();
510+
assertEquals(2, shards.length);
511+
Map<String, PollingIngestStats> shardTypeToStats = new HashMap<>();
512+
for (ShardStats shardStats : shards) {
513+
if (shardStats.getShardRouting().primary()) {
514+
shardTypeToStats.put("primary", shardStats.getPollingIngestStats());
515+
} else {
516+
shardTypeToStats.put("replica", shardStats.getPollingIngestStats());
517+
}
518+
}
519+
520+
return shardTypeToStats;
521+
}
239522
}

0 commit comments

Comments
 (0)