Skip to content

Commit 5bb19d9

Browse files
committed
Async flush of batches at server for Stream Transport
1 parent bb9a7d4 commit 5bb19d9

File tree

8 files changed

+418
-54
lines changed

8 files changed

+418
-54
lines changed

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ public ServerConfig() {}
8787
Setting.Property.NodeScope
8888
);
8989

90+
static final Setting<Integer> FLIGHT_EVENT_LOOP_THREADS = Setting.intSetting(
91+
"flight.event_loop.threads",
92+
Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
93+
1,
94+
Setting.Property.NodeScope
95+
);
96+
9097
static final Setting<Boolean> ARROW_SSL_ENABLE = Setting.boolSetting(
9198
"flight.ssl.enable",
9299
false, // TODO: get default from security enabled
@@ -112,6 +119,7 @@ public ServerConfig() {}
112119
private static int threadPoolMin;
113120
private static int threadPoolMax;
114121
private static TimeValue keepAlive;
122+
private static int eventLoopThreads;
115123

116124
/**
117125
* Initializes the server configuration with the provided settings.
@@ -134,6 +142,7 @@ public static void init(Settings settings) {
134142
threadPoolMin = FLIGHT_THREAD_POOL_MIN_SIZE.get(settings);
135143
threadPoolMax = FLIGHT_THREAD_POOL_MAX_SIZE.get(settings);
136144
keepAlive = FLIGHT_THREAD_POOL_KEEP_ALIVE.get(settings);
145+
eventLoopThreads = FLIGHT_EVENT_LOOP_THREADS.get(settings);
137146
}
138147

139148
/**
@@ -172,6 +181,15 @@ public static ScalingExecutorBuilder getClientExecutorBuilder() {
172181
return new ScalingExecutorBuilder(FLIGHT_CLIENT_THREAD_POOL_NAME, threadPoolMin, threadPoolMax, keepAlive);
173182
}
174183

184+
/**
185+
* Gets the configured number of event loop threads.
186+
*
187+
* @return The number of event loop threads
188+
*/
189+
public static int getEventLoopThreads() {
190+
return eventLoopThreads;
191+
}
192+
175193
/**
176194
* Returns a list of all settings managed by this configuration class.
177195
*
@@ -184,7 +202,8 @@ public static List<Setting<?>> getSettings() {
184202
ARROW_ENABLE_NULL_CHECK_FOR_GET,
185203
ARROW_ENABLE_DEBUG_ALLOCATOR,
186204
ARROW_ENABLE_UNSAFE_MEMORY_ACCESS,
187-
ARROW_SSL_ENABLE
205+
ARROW_SSL_ENABLE,
206+
FLIGHT_EVENT_LOOP_THREADS
188207
)
189208
);
190209
}

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ArrowFlightProducer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,13 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
6161
// https://github.com/apache/arrow/issues/38668
6262
executor.execute(() -> {
6363
FlightCallTracker callTracker = statsCollector.createServerCallTracker();
64-
FlightServerChannel channel = new FlightServerChannel(listener, allocator, middleware, callTracker);
64+
FlightServerChannel channel = new FlightServerChannel(
65+
listener,
66+
allocator,
67+
middleware,
68+
callTracker,
69+
flightTransport.getNextFlightExecutor()
70+
);
6571
try {
6672
BytesArray buf = new BytesArray(ticket.getBytes());
6773
callTracker.recordRequestBytes(buf.ramBytesUsed());
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.flight.transport;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.common.util.concurrent.ThreadContext;
13+
import org.opensearch.core.transport.TransportResponse;
14+
import org.opensearch.transport.TcpChannel;
15+
16+
import java.util.Set;
17+
18+
/**
19+
* Represents a batch processing task for Flight streaming responses.
20+
* @opensearch.internal
21+
*/
22+
class BatchTask implements AutoCloseable {
23+
final Version nodeVersion;
24+
final Set<String> features;
25+
final TcpChannel channel;
26+
final FlightTransportChannel transportChannel;
27+
final long requestId;
28+
final String action;
29+
final TransportResponse response;
30+
final boolean compress;
31+
final boolean isHandshake;
32+
final boolean isComplete;
33+
final boolean isError;
34+
final Exception error;
35+
final ThreadContext.StoredContext storedContext;
36+
37+
BatchTask(
38+
Version nodeVersion,
39+
Set<String> features,
40+
TcpChannel channel,
41+
FlightTransportChannel transportChannel,
42+
long requestId,
43+
String action,
44+
TransportResponse response,
45+
boolean compress,
46+
boolean isHandshake,
47+
boolean isComplete,
48+
ThreadContext.StoredContext storedContext
49+
) {
50+
this(
51+
nodeVersion,
52+
features,
53+
channel,
54+
transportChannel,
55+
requestId,
56+
action,
57+
response,
58+
compress,
59+
isHandshake,
60+
isComplete,
61+
false,
62+
null,
63+
storedContext
64+
);
65+
}
66+
67+
BatchTask(
68+
Version nodeVersion,
69+
Set<String> features,
70+
TcpChannel channel,
71+
FlightTransportChannel transportChannel,
72+
long requestId,
73+
String action,
74+
boolean compress,
75+
boolean isHandshake,
76+
Exception error,
77+
ThreadContext.StoredContext storedContext
78+
) {
79+
this(
80+
nodeVersion,
81+
features,
82+
channel,
83+
transportChannel,
84+
requestId,
85+
action,
86+
null,
87+
compress,
88+
isHandshake,
89+
false,
90+
true,
91+
error,
92+
storedContext
93+
);
94+
}
95+
96+
private BatchTask(
97+
Version nodeVersion,
98+
Set<String> features,
99+
TcpChannel channel,
100+
FlightTransportChannel transportChannel,
101+
long requestId,
102+
String action,
103+
TransportResponse response,
104+
boolean compress,
105+
boolean isHandshake,
106+
boolean isComplete,
107+
boolean isError,
108+
Exception error,
109+
ThreadContext.StoredContext storedContext
110+
) {
111+
this.nodeVersion = nodeVersion;
112+
this.features = features;
113+
this.channel = channel;
114+
this.transportChannel = transportChannel;
115+
this.requestId = requestId;
116+
this.action = action;
117+
this.response = response;
118+
this.compress = compress;
119+
this.isHandshake = isHandshake;
120+
this.isComplete = isComplete;
121+
this.isError = isError;
122+
this.error = error;
123+
this.storedContext = storedContext;
124+
}
125+
126+
@Override
127+
public void close() {
128+
if (storedContext != null) {
129+
storedContext.close();
130+
}
131+
if ((isComplete || isError) && transportChannel != null) {
132+
transportChannel.releaseChannel(isError);
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)