Skip to content

Commit

Permalink
Add more test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Aug 28, 2024
1 parent 5b3087a commit 62b27d2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,49 @@ public void testStreamingRequestManyBatchesBySize() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatchesByInterval() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofMillis(500);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_interval", "5s");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

// We don't check for a other documents here since those may appear in any of the chunks (it is very
// difficult to get the timing right). But at the end, the total number of the documents is being checked.
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final String refresh = request.param("refresh");
final TimeValue batchInterval = request.paramAsTime("batch_interval", null);
final int batchSize = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */
final boolean hasBatchSize = request.hasParam("batch_size"); /* is batch_size explicitly specified or default is used */

if (batchInterval != null && batchInterval.duration() <= 0) {
throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batchInterval.millis() + "ms].");

Check warning on line 104 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L104

Added line #L104 was not covered by tests
Expand Down Expand Up @@ -127,7 +128,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

// TODOs:
// - eliminate serialization inefficiencies
createBufferedFlux(batchInterval, batchSize, channel).zipWith(Flux.fromStream(Stream.generate(() -> {
createBufferedFlux(batchInterval, batchSize, hasBatchSize, channel).zipWith(Flux.fromStream(Stream.generate(() -> {

Check warning on line 131 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L131

Added line #L131 was not covered by tests
BulkRequest bulkRequest = Requests.bulkRequest();
bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards());
bulkRequest.timeout(prepareBulkRequest.timeout());
Expand Down Expand Up @@ -233,9 +234,19 @@ public boolean allowsUnsafeBuffers() {
return true;
}

private Flux<List<HttpChunk>> createBufferedFlux(final TimeValue batchInterval, final int batchSize, StreamingRestChannel channel) {
private Flux<List<HttpChunk>> createBufferedFlux(
final TimeValue batchInterval,
final int batchSize,
final boolean hasBatchSize,
StreamingRestChannel channel
) {
if (batchInterval != null) {
return Flux.from(channel).bufferTimeout(batchSize, Duration.ofMillis(batchInterval.millis()));
// If non-default batch size is specified, buffer by interval and batch
if (hasBatchSize) {
return Flux.from(channel).bufferTimeout(batchSize, Duration.ofMillis(batchInterval.millis()));

Check warning on line 246 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L246

Added line #L246 was not covered by tests
} else {
return Flux.from(channel).buffer(Duration.ofMillis(batchInterval.millis()));

Check warning on line 248 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L248

Added line #L248 was not covered by tests
}
} else {
return Flux.from(channel).buffer(batchSize);

Check warning on line 251 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L251

Added line #L251 was not covered by tests
}
Expand Down

0 comments on commit 62b27d2

Please sign in to comment.