Skip to content

Commit

Permalink
Fix Netty's ByteBuf leak (opensearch-project#15475)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Aug 28, 2024
1 parent f5da8c8 commit acee2ae
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,31 @@

package org.opensearch.http.reactor.netty4;

import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.buffer.ByteBuf;

class ReactorNetty4HttpChunk implements HttpChunk {
private final AtomicBoolean released;
private final boolean pooled;
private final ByteBuf content;
private final BytesArray content;
private final boolean last;

ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
this.content = content;
this.pooled = true;
this.released = new AtomicBoolean(false);
ReactorNetty4HttpChunk(ByteBuf buf, boolean last) {
// Since the chunks could be batched and processing could be delayed, we are copying the content here
final byte[] content = new byte[buf.readableBytes()];
buf.readBytes(content);
this.content = new BytesArray(content);
this.last = last;
}

@Override
public BytesReference content() {
assert released.get() == false;
return Netty4Utils.toBytesReference(content);
return content;
}

@Override
public void close() {
if (pooled && released.compareAndSet(false, true)) {
content.release();
}
}
public void close() {}

@Override
public boolean isLast() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void receiveChunk(HttpChunk message) {
}
} catch (final Exception ex) {
producer.error(ex);
} finally {
message.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void subscribe(Subscriber<? super HttpContent> s) {
}

HttpChunk createChunk(HttpContent chunk, boolean last) {
return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last);
return new ReactorNetty4HttpChunk(chunk.content(), last);
}

StreamingHttpChannel httpChannel() {
Expand Down

0 comments on commit acee2ae

Please sign in to comment.