Skip to content

Commit 034b88b

Browse files
committed
THRIFT-5863: Make TServerTransport able to configure the max message size
1 parent 94e1a30 commit 034b88b

File tree

5 files changed

+37
-1
lines changed

5 files changed

+37
-1
lines changed

lib/java/src/main/java/org/apache/thrift/transport/TEndpointTransport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public void setMaxFrameSize(int maxFrameSize) {
3535
getConfiguration().setMaxFrameSize(maxFrameSize);
3636
}
3737

38+
public void setMaxMessageSize(int maxMessageSize) {
39+
getConfiguration().setMaxMessageSize(maxMessageSize);
40+
}
41+
3842
protected long knownMessageSize;
3943
protected long remainingMessageSize;
4044

lib/java/src/main/java/org/apache/thrift/transport/TNonblockingServerSocket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport {
4949
/** Limit for client sockets request size */
5050
private int maxFrameSize_ = 0;
5151

52+
/** Max message size */
53+
private int maxMessageSize_ = 0;
54+
5255
public static class NonblockingAbstractServerSocketArgs
5356
extends AbstractServerTransportArgs<NonblockingAbstractServerSocketArgs> {}
5457

@@ -93,6 +96,7 @@ public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args)
9396
throws TTransportException {
9497
clientTimeout_ = args.clientTimeout;
9598
maxFrameSize_ = args.maxFrameSize;
99+
maxMessageSize_ = args.maxMessageSize;
96100
try {
97101
serverSocketChannel = ServerSocketChannel.open();
98102
serverSocketChannel.configureBlocking(false);
@@ -135,6 +139,7 @@ public TNonblockingSocket accept() throws TTransportException {
135139
TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
136140
tsocket.setTimeout(clientTimeout_);
137141
tsocket.setMaxFrameSize(maxFrameSize_);
142+
tsocket.setMaxMessageSize(maxMessageSize_);
138143
return tsocket;
139144
} catch (IOException iox) {
140145
throw new TTransportException(iox);

lib/java/src/main/java/org/apache/thrift/transport/TServerSocket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public class TServerSocket extends TServerTransport {
3838
/** Timeout for client sockets from accept */
3939
private int clientTimeout_ = 0;
4040

41+
/** Max message size */
42+
private int maxMessageSize_ = 0;
43+
4144
public static class ServerSocketTransportArgs
4245
extends AbstractServerTransportArgs<ServerSocketTransportArgs> {
4346
ServerSocket serverSocket;
@@ -78,6 +81,7 @@ public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTran
7881

7982
public TServerSocket(ServerSocketTransportArgs args) throws TTransportException {
8083
clientTimeout_ = args.clientTimeout;
84+
maxMessageSize_ = args.maxMessageSize;
8185
if (args.serverSocket != null) {
8286
this.serverSocket_ = args.serverSocket;
8387
return;
@@ -123,6 +127,7 @@ public TSocket accept() throws TTransportException {
123127
}
124128
TSocket socket = new TSocket(result);
125129
socket.setTimeout(clientTimeout_);
130+
socket.setMaxMessageSize(maxMessageSize_);
126131
return socket;
127132
}
128133

lib/java/src/main/java/org/apache/thrift/transport/TServerTransport.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public abstract static class AbstractServerTransportArgs<
3232
int clientTimeout = 0;
3333
InetSocketAddress bindAddr;
3434
int maxFrameSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE;
35+
int maxMessageSize = TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
3536

3637
public T backlog(int backlog) {
3738
this.backlog = backlog;
@@ -57,6 +58,11 @@ public T maxFrameSize(int maxFrameSize) {
5758
this.maxFrameSize = maxFrameSize;
5859
return (T) this;
5960
}
61+
62+
public T maxMessageSize(int maxMessageSize) {
63+
this.maxMessageSize = maxMessageSize;
64+
return (T) this;
65+
}
6066
}
6167

6268
public abstract void listen() throws TTransportException;

lib/java/src/test/java/org/apache/thrift/server/TestThreadPoolServer.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
2424

2525
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.atomic.AtomicReference;
2627
import org.apache.thrift.protocol.TBinaryProtocol;
2728
import org.apache.thrift.transport.TServerSocket;
2829
import org.apache.thrift.transport.TServerTransport;
2930
import org.apache.thrift.transport.TSocket;
31+
import org.apache.thrift.transport.TTransportException;
3032
import org.junit.jupiter.api.Test;
3133
import thrift.test.ThriftTest;
3234

@@ -35,7 +37,20 @@ public class TestThreadPoolServer {
3537
/** Test server is shut down properly even with some open clients. */
3638
@Test
3739
public void testStopServerWithOpenClient() throws Exception {
38-
TServerSocket serverSocket = new TServerSocket(0, 3000);
40+
AtomicReference<TSocket> ref = new AtomicReference<>();
41+
TServerSocket serverSocket =
42+
new TServerSocket(
43+
new TServerSocket.ServerSocketTransportArgs()
44+
.port(0)
45+
.clientTimeout(3000)
46+
.maxMessageSize(51200)) {
47+
@Override
48+
public TSocket accept() throws TTransportException {
49+
TSocket socket = super.accept();
50+
ref.set(socket);
51+
return socket;
52+
}
53+
};
3954
TThreadPoolServer server = buildServer(serverSocket);
4055
Thread serverThread = new Thread(server::serve);
4156
serverThread.start();
@@ -44,6 +59,7 @@ public void testStopServerWithOpenClient() throws Exception {
4459
Thread.sleep(1000);
4560
// There is a thread listening to the client
4661
assertEquals(1, ((ThreadPoolExecutor) server.getExecutorService()).getActiveCount());
62+
assertEquals(51200, ref.get().getConfiguration().getMaxMessageSize());
4763

4864
// Trigger the server to stop, but it does not wait
4965
server.stop();

0 commit comments

Comments
 (0)