Skip to content

Commit 0fc1213

Browse files
committed
Async flush of batches at server for Stream Transport
Signed-off-by: Rishabh Maurya <[email protected]>
1 parent bb9a7d4 commit 0fc1213

File tree

8 files changed

+415
-54
lines changed

8 files changed

+415
-54
lines changed

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.netty.channel.nio.NioEventLoopGroup;
3131
import io.netty.channel.socket.nio.NioServerSocketChannel;
3232
import io.netty.channel.socket.nio.NioSocketChannel;
33+
import io.netty.util.NettyRuntime;
3334

3435
/**
3536
* Configuration class for OpenSearch Flight server settings.
@@ -87,6 +88,13 @@ public ServerConfig() {}
8788
Setting.Property.NodeScope
8889
);
8990

91+
static final Setting<Integer> FLIGHT_EVENT_LOOP_THREADS = Setting.intSetting(
92+
"flight.event_loop.threads",
93+
Math.max(1, NettyRuntime.availableProcessors() * 2),
94+
1,
95+
Setting.Property.NodeScope
96+
);
97+
9098
static final Setting<Boolean> ARROW_SSL_ENABLE = Setting.boolSetting(
9199
"flight.ssl.enable",
92100
false, // TODO: get default from security enabled
@@ -112,6 +120,7 @@ public ServerConfig() {}
112120
private static int threadPoolMin;
113121
private static int threadPoolMax;
114122
private static TimeValue keepAlive;
123+
private static int eventLoopThreads;
115124

116125
/**
117126
* Initializes the server configuration with the provided settings.
@@ -134,6 +143,7 @@ public static void init(Settings settings) {
134143
threadPoolMin = FLIGHT_THREAD_POOL_MIN_SIZE.get(settings);
135144
threadPoolMax = FLIGHT_THREAD_POOL_MAX_SIZE.get(settings);
136145
keepAlive = FLIGHT_THREAD_POOL_KEEP_ALIVE.get(settings);
146+
eventLoopThreads = FLIGHT_EVENT_LOOP_THREADS.get(settings);
137147
}
138148

139149
/**
@@ -172,6 +182,15 @@ public static ScalingExecutorBuilder getClientExecutorBuilder() {
172182
return new ScalingExecutorBuilder(FLIGHT_CLIENT_THREAD_POOL_NAME, threadPoolMin, threadPoolMax, keepAlive);
173183
}
174184

185+
/**
186+
* Gets the configured number of event loop threads.
187+
*
188+
* @return The number of event loop threads
189+
*/
190+
public static int getEventLoopThreads() {
191+
return eventLoopThreads;
192+
}
193+
175194
/**
176195
* Returns a list of all settings managed by this configuration class.
177196
*
@@ -184,7 +203,8 @@ public static List<Setting<?>> getSettings() {
184203
ARROW_ENABLE_NULL_CHECK_FOR_GET,
185204
ARROW_ENABLE_DEBUG_ALLOCATOR,
186205
ARROW_ENABLE_UNSAFE_MEMORY_ACCESS,
187-
ARROW_SSL_ENABLE
206+
ARROW_SSL_ENABLE,
207+
FLIGHT_EVENT_LOOP_THREADS
188208
)
189209
);
190210
}

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowFlightProducer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,13 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
6161
// https://github.com/apache/arrow/issues/38668
6262
executor.execute(() -> {
6363
FlightCallTracker callTracker = statsCollector.createServerCallTracker();
64-
FlightServerChannel channel = new FlightServerChannel(listener, allocator, middleware, callTracker);
64+
FlightServerChannel channel = new FlightServerChannel(
65+
listener,
66+
allocator,
67+
middleware,
68+
callTracker,
69+
flightTransport.getNextFlightExecutor()
70+
);
6571
try {
6672
BytesArray buf = new BytesArray(ticket.getBytes());
6773
callTracker.recordRequestBytes(buf.ramBytesUsed());
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.flight.transport;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.common.util.concurrent.ThreadContext;
13+
import org.opensearch.core.transport.TransportResponse;
14+
import org.opensearch.transport.TcpChannel;
15+
16+
import java.util.Set;
17+
18+
/**
19+
* Represents a batch processing task for Flight streaming responses.
20+
* @opensearch.internal
21+
*/
22+
class BatchTask implements AutoCloseable {
23+
final Version nodeVersion;
24+
final Set<String> features;
25+
final TcpChannel channel;
26+
final FlightTransportChannel transportChannel;
27+
final long requestId;
28+
final String action;
29+
final TransportResponse response;
30+
final boolean compress;
31+
final boolean isHandshake;
32+
final boolean isComplete;
33+
final boolean isError;
34+
final Exception error;
35+
final ThreadContext.StoredContext storedContext;
36+
37+
BatchTask(
38+
Version nodeVersion,
39+
Set<String> features,
40+
TcpChannel channel,
41+
FlightTransportChannel transportChannel,
42+
long requestId,
43+
String action,
44+
TransportResponse response,
45+
boolean compress,
46+
boolean isHandshake,
47+
boolean isComplete,
48+
ThreadContext.StoredContext storedContext
49+
) {
50+
this(
51+
nodeVersion,
52+
features,
53+
channel,
54+
transportChannel,
55+
requestId,
56+
action,
57+
response,
58+
compress,
59+
isHandshake,
60+
isComplete,
61+
false,
62+
null,
63+
storedContext
64+
);
65+
}
66+
67+
BatchTask(
68+
Version nodeVersion,
69+
Set<String> features,
70+
TcpChannel channel,
71+
FlightTransportChannel transportChannel,
72+
long requestId,
73+
String action,
74+
boolean compress,
75+
boolean isHandshake,
76+
Exception error,
77+
ThreadContext.StoredContext storedContext
78+
) {
79+
this(
80+
nodeVersion,
81+
features,
82+
channel,
83+
transportChannel,
84+
requestId,
85+
action,
86+
null,
87+
compress,
88+
isHandshake,
89+
false,
90+
true,
91+
error,
92+
storedContext
93+
);
94+
}
95+
96+
private BatchTask(
97+
Version nodeVersion,
98+
Set<String> features,
99+
TcpChannel channel,
100+
FlightTransportChannel transportChannel,
101+
long requestId,
102+
String action,
103+
TransportResponse response,
104+
boolean compress,
105+
boolean isHandshake,
106+
boolean isComplete,
107+
boolean isError,
108+
Exception error,
109+
ThreadContext.StoredContext storedContext
110+
) {
111+
this.nodeVersion = nodeVersion;
112+
this.features = features;
113+
this.channel = channel;
114+
this.transportChannel = transportChannel;
115+
this.requestId = requestId;
116+
this.action = action;
117+
this.response = response;
118+
this.compress = compress;
119+
this.isHandshake = isHandshake;
120+
this.isComplete = isComplete;
121+
this.isError = isError;
122+
this.error = error;
123+
this.storedContext = storedContext;
124+
}
125+
126+
@Override
127+
public void close() {
128+
if (storedContext != null) {
129+
storedContext.close();
130+
}
131+
if ((isComplete || isError) && transportChannel != null) {
132+
transportChannel.releaseChannel(isError);
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)