Skip to content

Commit

Permalink
[Streaming Indexing] Enhance RestAction with request / response strea…
Browse files Browse the repository at this point in the history
…ming support

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed May 22, 2024
1 parent 66df930 commit cc21ef8
Show file tree
Hide file tree
Showing 17 changed files with 532 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.core.action.ActionListener;

import io.netty.handler.codec.http.HttpContent;
import org.reactivestreams.Publisher;

/**
* The generic interface for chunked {@link HttpContent} producers (response streaming).
*/
interface HttpContentSender extends Publisher<HttpContent> {
void send(HttpContent content, ActionListener<Void> listener, boolean isLast);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.buffer.ByteBuf;

class ReactorNetty4HttpChunk implements HttpChunk {

Check warning on line 19 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L19

Added line #L19 was not covered by tests
private final AtomicBoolean released;
private final boolean pooled;
private final ByteBuf content;
private final boolean last;

ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
this(new AtomicBoolean(false), true, content, last);
}

Check warning on line 27 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L26-L27

Added lines #L26 - L27 were not covered by tests

private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) {
this.content = content;
this.pooled = pooled;
this.released = released;
this.last = last;
}

Check warning on line 34 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L29-L34

Added lines #L29 - L34 were not covered by tests

@Override
public BytesReference content() {
assert released.get() == false;
return Netty4Utils.toBytesReference(content);

Check warning on line 39 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L39

Added line #L39 was not covered by tests
}

@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
content.release();

Check warning on line 45 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L45

Added line #L45 was not covered by tests
}
}

Check warning on line 47 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L47

Added line #L47 was not covered by tests

@Override
public boolean isLast() {
return last;

Check warning on line 51 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java#L51

Added line #L51 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.reactor.SharedGroupFactory;
Expand All @@ -40,6 +42,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -351,24 +354,46 @@ public List<String> protocols() {
* @return response publisher
*/
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
final NonStreamingRequestConsumer<HttpContent> consumer = new NonStreamingRequestConsumer<>(
this,
request,
response,
maxCompositeBufferComponents
final Method method = HttpConversionUtil.convertMethod(request.method());
final Optional<RestHandler> dispatchHandlerOpt = dispatcher.dispatchHandler(
request.uri(),
request.fullPath(),
method,
request.params()
);
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(

Check warning on line 365 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L365

Added line #L365 was not covered by tests
this,
request,
response
);

request.receiveContent()
.switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT));
consumer.start();

Check warning on line 374 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L371-L374

Added lines #L371 - L374 were not covered by tests

return response.sendObject(consumer);

Check warning on line 376 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L376

Added line #L376 was not covered by tests
} else {
final ReactorNetty4NonStreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4NonStreamingRequestConsumer<>(
this,
request,
response,
maxCompositeBufferComponents
);

request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);

return Mono.from(consumer).flatMap(hc -> {
final FullHttpResponse r = (FullHttpResponse) hc;
response.status(r.status());
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
response.chunkedTransfer(false);
response.compression(true);
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
return Mono.from(response.sendObject(r.content()));
});
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);

return Mono.from(consumer).flatMap(hc -> {
final FullHttpResponse r = (FullHttpResponse) hc;
response.status(r.status());
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
response.chunkedTransfer(false);
response.compression(true);
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
return Mono.from(response.sendObject(r.content()));
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class NonStreamingHttpChannel implements HttpChannel {
class ReactorNetty4NonStreamingHttpChannel implements HttpChannel {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final FluxSink<HttpContent> emitter;

NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
this.request = request;
this.response = response;
this.emitter = emitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompositeByteBuf content;
Expand All @@ -34,7 +34,7 @@ class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>,
private final AtomicBoolean disposed = new AtomicBoolean(false);
private volatile FluxSink<HttpContent> emitter;

NonStreamingRequestConsumer(
ReactorNetty4NonStreamingRequestConsumer(
AbstractHttpServerTransport transport,
HttpServerRequest request,
HttpServerResponse response,
Expand Down Expand Up @@ -64,12 +64,12 @@ public void accept(T message) {
}
}

public void process(HttpContent in, FluxSink<HttpContent> emitter) {
void process(HttpContent in, FluxSink<HttpContent> emitter) {
// Consume request body in full before dispatching it
content.addComponent(true, in.content().retain());

if (in instanceof LastHttpContent) {
final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter);
final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter);
final HttpRequest r = createRequest(request, content);

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.common.concurrent.CompletableContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpResponse;
import org.opensearch.http.StreamingHttpChannel;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompletableContext<Void> closeContext = new CompletableContext<>();

Check warning on line 37 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L37

Added line #L37 was not covered by tests
private final Publisher<HttpChunk> receiver;
private final HttpContentSender sender;
private volatile FluxSink<HttpChunk> producer;
private volatile boolean lastChunkReceived = false;

Check warning on line 41 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L41

Added line #L41 was not covered by tests

ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) {
this.request = request;
this.response = response;
this.sender = sender;
this.receiver = Flux.create(producer -> this.producer = producer);
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext));
}

Check warning on line 49 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L43-L49

Added lines #L43 - L49 were not covered by tests

@Override
public boolean isOpen() {
return true;

Check warning on line 53 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L53

Added line #L53 was not covered by tests
}

@Override
public void close() {
request.withConnection(connection -> connection.channel().close());
}

Check warning on line 59 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L58-L59

Added lines #L58 - L59 were not covered by tests

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

Check warning on line 64 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L63-L64

Added lines #L63 - L64 were not covered by tests

@Override
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) {
sender.send(createContent(chunk), listener, chunk.isLast());
}

Check warning on line 69 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L68-L69

Added lines #L68 - L69 were not covered by tests

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
sender.send(createContent(response), listener, true);
}

Check warning on line 74 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L73-L74

Added lines #L73 - L74 were not covered by tests

@Override
public void prepareResponse(int status, Map<String, List<String>> headers) {
this.response.status(status);
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
}

Check warning on line 80 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L78-L80

Added lines #L78 - L80 were not covered by tests

@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) response.remoteAddress();

Check warning on line 84 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L84

Added line #L84 was not covered by tests
}

@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) response.hostAddress();

Check warning on line 89 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L89

Added line #L89 was not covered by tests
}

@Override
public void receiveChunk(HttpChunk message) {
if (lastChunkReceived) {
return;

Check warning on line 95 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L95

Added line #L95 was not covered by tests
}

producer.next(message);

Check warning on line 98 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L98

Added line #L98 was not covered by tests
if (message.isLast()) {
lastChunkReceived = true;
producer.complete();

Check warning on line 101 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}

Check warning on line 103 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L103

Added line #L103 was not covered by tests

@Override
public void subscribe(java.util.concurrent.Flow.Subscriber<? super HttpChunk> s) {
receiver.subscribe(FlowAdapters.toSubscriber(s));
}

Check warning on line 108 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L107-L108

Added lines #L107 - L108 were not covered by tests

private static HttpContent createContent(HttpResponse response) {
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response;
return new DefaultHttpContent(fullHttpResponse.content());

Check warning on line 112 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L111-L112

Added lines #L111 - L112 were not covered by tests
}

private static HttpContent createContent(HttpChunk chunk) {
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content())));

Check warning on line 116 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java#L116

Added line #L116 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.http.AbstractHttpServerTransport;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpRequest;
import org.opensearch.http.StreamingHttpChannel;

import java.util.function.Consumer;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent> {
private final AbstractHttpServerTransport transport;
private final HttpServerRequest request;
private final HttpContentSender sender;
private final StreamingHttpChannel httpChannel;

ReactorNetty4StreamingRequestConsumer(AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response) {
this.transport = transport;
this.request = request;
this.sender = new ReactorNetty4StreamingResponseProducer();
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
}

Check warning on line 37 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L32-L37

Added lines #L32 - L37 were not covered by tests

@Override
public void accept(T message) {
if (message instanceof LastHttpContent) {
httpChannel.receiveChunk(createChunk(message, true));

Check warning on line 42 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L42

Added line #L42 was not covered by tests
} else if (message instanceof HttpContent) {
httpChannel.receiveChunk(createChunk(message, false));

Check warning on line 44 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L44

Added line #L44 was not covered by tests
}
}

Check warning on line 46 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L46

Added line #L46 was not covered by tests

@Override
public void subscribe(Subscriber<? super HttpContent> s) {
sender.subscribe(s);
}

Check warning on line 51 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L50-L51

Added lines #L50 - L51 were not covered by tests

void start() {
transport.incomingStream(createRequest(request), httpChannel);
}

Check warning on line 55 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L54-L55

Added lines #L54 - L55 were not covered by tests

HttpRequest createRequest(HttpServerRequest request) {
return new ReactorNetty4HttpRequest(request, Unpooled.EMPTY_BUFFER);

Check warning on line 58 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L58

Added line #L58 was not covered by tests
}

HttpChunk createChunk(HttpContent chunk, boolean last) {
return new ReactorNetty4HttpChunk(chunk.content(), last);

Check warning on line 62 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java#L62

Added line #L62 was not covered by tests
}
}
Loading

0 comments on commit cc21ef8

Please sign in to comment.