Skip to content

Commit bf723e8

Browse files
committed
Rewire the shutdown sequence of VertxConnection.
Motivation: The VertxConnection relies on Netty channel close, instead it should be separate and keep channel close as short as possible.
1 parent da29111 commit bf723e8

File tree

11 files changed

+167
-167
lines changed

11 files changed

+167
-167
lines changed

vertx-core/src/main/asciidoc/net.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ The default shut-down timeout is 30 seconds, you can override the amount of time
207207
{@link examples.NetExamples#serverShutdownWithAmountOfTime}
208208
----
209209

210+
NOTE: any socket without a shutdown handler is closed immediately
211+
210212
=== TCP close
211213

212214
You can close a {@link io.vertx.core.net.NetServer#close() server} or {@link io.vertx.core.net.NetClient#close() client} to

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,8 @@ void handleClosed(Throwable err) {
723723
}
724724

725725
@Override
726-
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
727-
super.handleShutdown(timeout, unit, promise);
726+
protected void handleShutdown(ChannelPromise promise) {
727+
super.handleShutdown(promise);
728728
checkLifecycle();
729729
}
730730

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xConnection.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ abstract class Http1xConnection extends VertxConnection implements io.vertx.core
3939

4040
protected boolean closeInitiated;
4141
protected boolean shutdownInitiated;
42-
protected long shutdownTimeout;
43-
protected TimeUnit shutdownUnit;
4442
protected ChannelPromise closePromise;
4543

4644
private Handler<Void> shutdownHandler;
@@ -59,10 +57,8 @@ public synchronized Http1xConnection shutdownHandler(@Nullable Handler<Void> han
5957
}
6058

6159
@Override
62-
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
60+
protected void handleShutdown(ChannelPromise promise) {
6361
shutdownInitiated = true;
64-
shutdownTimeout = timeout;
65-
shutdownUnit = unit;
6662
closePromise = promise;
6763
Handler<Void> handler;
6864
synchronized (this) {
@@ -83,7 +79,7 @@ protected void closeInternal() {
8379
if (closeInitiated) {
8480
// Nothing to do
8581
} else if (shutdownInitiated) {
86-
super.handleShutdown(shutdownTimeout, shutdownUnit, closePromise);
82+
super.handleShutdown(closePromise);
8783
} else {
8884
chctx.channel().close();
8985
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import io.vertx.core.spi.tracing.VertxTracer;
4343
import io.vertx.core.tracing.TracingPolicy;
4444

45-
import java.util.concurrent.TimeUnit;
4645
import java.util.function.Supplier;
4746

4847
import static io.netty.handler.codec.http.HttpResponseStatus.*;
@@ -107,8 +106,8 @@ public Http1xServerConnection(ThreadingModel threadingModel,
107106
}
108107

109108
@Override
110-
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
111-
super.handleShutdown(timeout, unit, promise);
109+
protected void handleShutdown(ChannelPromise promise) {
110+
super.handleShutdown(promise);
112111
if (responseInProgress != null) {
113112
} else {
114113
closeInternal();

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketConnectionImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ public Future<Void> shutdown(Object reason, long timeout, TimeUnit unit) {
9090
}
9191

9292
@Override
93-
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
94-
webSocket.handleShutdown();
93+
protected void handleShutdown(ChannelPromise promise) {
94+
if (!webSocket.handleShutdown()) {
95+
super.handleShutdown(promise);
96+
}
9597
}
9698

9799
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,14 +592,15 @@ private Handler<Buffer> pongHandler() {
592592
}
593593
}
594594

595-
void handleShutdown() {
595+
boolean handleShutdown() {
596596
Handler<Void> handler;
597597
synchronized (this) {
598598
handler = shutdownHandler;
599599
}
600600
if (handler != null) {
601601
context.emit(handler);
602602
}
603+
return handler != null;
603604
}
604605

605606
void handleWriteQueueDrained(Void v) {

vertx-core/src/main/java/io/vertx/core/net/impl/SocketBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.io.IOException;
3535
import java.io.RandomAccessFile;
3636
import java.nio.charset.Charset;
37-
import java.util.concurrent.TimeUnit;
3837

3938
/**
4039
* @author <a href="http://tfox.org">Tim Fox</a>
@@ -251,13 +250,15 @@ public synchronized S shutdownHandler(@Nullable Handler<Void> handler) {
251250
}
252251

253252
@Override
254-
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
253+
protected void handleShutdown(ChannelPromise promise) {
255254
Handler<Void> handler;
256255
synchronized (this) {
257256
handler = shutdownHandler;
258257
}
259258
if (handler != null) {
260259
context.emit(handler);
260+
} else {
261+
super.handleShutdown(promise);
261262
}
262263
}
263264

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 58 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ public class VertxConnection extends ConnectionBase {
6868
private boolean channelWritable;
6969
private boolean paused;
7070
private boolean autoRead;
71-
private ScheduledFuture<?> shutdownTimeout;
7271

7372
// State accessed exclusively from the event loop thread
74-
private ChannelPromise closeInitiated;
73+
private ScheduledFuture<?> shutdownTimeout;
74+
private ChannelPromise shutdown;
7575
private boolean closeSent;
7676

7777
public VertxConnection(ContextInternal context, ChannelHandlerContext chctx) {
@@ -127,73 +127,61 @@ protected void handleEvent(Object event) {
127127
*/
128128
protected void handleIdle(IdleStateEvent event) {
129129
log.debug("The connection will be closed due to timeout");
130+
// Should be channel close ...
130131
chctx.close();
131132
}
132133

133134
protected boolean supportsFileRegion() {
134135
return vertx.transport().supportFileRegion() && !isSsl() &&!isTrafficShaped();
135136
}
136137

137-
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
138+
/**
139+
* Implement the shutdown default's behavior that cancels the shutdown timeout and close the channel with the
140+
* channel {@code promise} argument.
141+
*
142+
* @param promise the channel promise to be used for closing the channel
143+
*/
144+
protected void handleShutdown(ChannelPromise promise) {
138145
// Assert from event-loop
139146
ScheduledFuture<?> t = shutdownTimeout;
140-
if (t != null) {
141-
shutdownTimeout = null;
142-
t.cancel(false);
143-
terminateClose(promise);
144-
}
145-
}
146-
147-
private static class CloseChannelPromise extends DefaultChannelPromise {
148-
final long timeout;
149-
final TimeUnit unit;
150-
public CloseChannelPromise(Channel channel, long timeout, TimeUnit unit) {
151-
super(channel);
152-
this.timeout = timeout;
153-
this.unit = unit;
147+
if (t != null && t.cancel(false)) {
148+
channel.close(shutdown);
154149
}
155150
}
156151

157152
/**
158-
* Close the connection
153+
* Override the {@link ConnectionBase#close()} behavior to cooperate with the shutdown sequence.
159154
*/
160155
public final Future<Void> close() {
161156
return shutdown(0L, TimeUnit.SECONDS);
162157
}
163158

164-
// Is this necessary ?
159+
/**
160+
* Initiate the connection shutdown sequence.
161+
*
162+
* @param timeout the shutdown timeout
163+
* @param unit the shutdown timeout unit
164+
* @return the future completed after the channel's closure
165+
*/
165166
public final Future<Void> shutdown(long timeout, TimeUnit unit) {
166-
CloseChannelPromise promise = new CloseChannelPromise(channel, timeout, unit);
167-
shutdown(promise);
168-
PromiseInternal<Void> p = context.promise();
169-
promise.addListener(p);
170-
return p.future();
171-
}
172-
173-
private void shutdown(CloseChannelPromise promise) {
167+
ChannelPromise promise = channel.newPromise();
174168
EventExecutor exec = chctx.executor();
175169
if (exec.inEventLoop()) {
176-
channel.close(promise);
170+
shutdown(timeout, unit, promise);
177171
} else {
178-
exec.execute(() -> shutdown(promise));
172+
exec.execute(() -> shutdown(timeout, unit, promise));
179173
}
174+
PromiseInternal<Void> p = context.promise();
175+
promise.addListener(p);
176+
return p.future();
180177
}
181178

182-
/**
183-
* Called exclusively by the Netty handler close method.
184-
*/
185-
final void handleClose(ChannelPromise promise) {
186-
if (closeInitiated != null) {
187-
long timeout;
188-
if (promise instanceof CloseChannelPromise) {
189-
timeout = ((CloseChannelPromise)promise).timeout;
190-
} else {
191-
timeout = 0L;
192-
}
193-
if (timeout == 0L && !closeSent) {
194-
closeSent = true;
195-
closeInitiated = promise;
196-
writeClose(promise);
179+
private void shutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
180+
if (shutdown != null) {
181+
ScheduledFuture<?> t = shutdownTimeout;
182+
if (timeout == 0L && (t == null || t.cancel(false))) {
183+
shutdown = promise;
184+
channel.close(promise);
197185
} else {
198186
channel
199187
.closeFuture()
@@ -206,34 +194,38 @@ final void handleClose(ChannelPromise promise) {
206194
});
207195
}
208196
} else {
209-
closeInitiated = promise;
210-
long timeout;
211-
TimeUnit unit;
212-
if (promise instanceof CloseChannelPromise) {
213-
CloseChannelPromise closeChannelPromise = (CloseChannelPromise) promise;
214-
timeout = closeChannelPromise.timeout;
215-
unit = closeChannelPromise.unit;
216-
} else {
217-
timeout = 0L;
218-
unit = TimeUnit.SECONDS;
219-
}
197+
shutdown = promise;
220198
if (timeout == 0L) {
221-
terminateClose(promise);
199+
channel.close(promise);
222200
} else {
223201
EventExecutor el = chctx.executor();
224202
shutdownTimeout = el.schedule(() -> {
225-
shutdownTimeout = null;
226-
terminateClose(promise);
203+
channel.close(promise);
227204
}, timeout, unit);
228-
handleShutdown(timeout, unit, promise);
205+
handleShutdown(promise);
229206
}
230207
}
231208
}
232209

210+
// Exclusively called by the owning handler close signal
211+
void handleClose(ChannelPromise promise) {
212+
terminateClose(promise);
213+
}
214+
233215
private void terminateClose(ChannelPromise promise) {
234216
if (!closeSent) {
235217
closeSent = true;
236218
writeClose(promise);
219+
} else {
220+
channel
221+
.closeFuture()
222+
.addListener(future -> {
223+
if (future.isSuccess()) {
224+
promise.setSuccess();
225+
} else {
226+
promise.setFailure(future.cause());
227+
}
228+
});
237229
}
238230
}
239231

@@ -247,13 +239,18 @@ private void terminateClose(ChannelPromise promise) {
247239
* This method is exclusively called on the event-loop thread and relays a channel user event.
248240
*/
249241
protected void writeClose(ChannelPromise promise) {
250-
doWriteClose(promise);
242+
// Make sure everything is flushed out on close
243+
ChannelPromise channelPromise = chctx
244+
.newPromise()
245+
.addListener((ChannelFutureListener) f -> {
246+
chctx.close(promise);
247+
});
248+
writeToChannel(Unpooled.EMPTY_BUFFER, true, channelPromise);
251249
}
252250

253251
protected void handleClosed() {
254252
ScheduledFuture<?> timeout = shutdownTimeout;
255253
if (timeout != null) {
256-
shutdownTimeout = null;
257254
timeout.cancel(false);
258255
}
259256
outboundMessageQueue.close();
@@ -426,21 +423,6 @@ public final ChannelPromise write(Object msg, boolean forceFlush, ChannelPromise
426423
return promise;
427424
}
428425

429-
/**
430-
* This method is exclusively called on the event-loop thread
431-
*
432-
* @param promise the promise receiving the completion event
433-
*/
434-
private void doWriteClose(ChannelPromise promise) {
435-
// Make sure everything is flushed out on close
436-
ChannelPromise channelPromise = chctx
437-
.newPromise()
438-
.addListener((ChannelFutureListener) f -> {
439-
chctx.close(promise);
440-
});
441-
writeToChannel(Unpooled.EMPTY_BUFFER, true, channelPromise);
442-
}
443-
444426
public final boolean writeToChannel(Object obj) {
445427
return writeToChannel(obj, voidPromise);
446428
}

vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4037,6 +4037,34 @@ public void testServerShutdownOverride() throws Exception {
40374037
await();
40384038
}
40394039

4040+
@Test
4041+
public void testServerShutdown() throws Exception {
4042+
waitFor(2);
4043+
long now = System.currentTimeMillis();
4044+
server = vertx
4045+
.createHttpServer()
4046+
.webSocketHandler(ws -> {
4047+
ws.closeHandler(v -> {
4048+
long d = System.currentTimeMillis() - now;
4049+
assertTrue(d <= 500);
4050+
complete();
4051+
});
4052+
ws.handler(buff -> {
4053+
ws.shutdown(10, TimeUnit.SECONDS);
4054+
});
4055+
});
4056+
awaitFuture(server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST));
4057+
client = vertx.createWebSocketClient();
4058+
client.connect(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/")
4059+
.onComplete(onSuccess(ws -> {
4060+
ws.write(Buffer.buffer("ping"));
4061+
ws.closeHandler(v -> {
4062+
complete();
4063+
});
4064+
}));
4065+
await();
4066+
}
4067+
40404068
@Test
40414069
public void testCustomResponseHeadersBeforeUpgrade() throws InterruptedException {
40424070
String path = "/some/path";

0 commit comments

Comments
 (0)