Skip to content

Commit da29111

Browse files
committed
Move the VertxConnection close reason to the WebSocketConnectionImpl class.
Motivation: Since only WebSocket use an effective close reason, we should move this generic part of the VertxConnection to WebSocketConnection implementation to simplify unclutter VertxConnection and keep it focus on its responsibitlies.
1 parent ff5fd86 commit da29111

File tree

10 files changed

+53
-63
lines changed

10 files changed

+53
-63
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,8 @@ void handleClosed(Throwable err) {
723723
}
724724

725725
@Override
726-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
727-
super.handleShutdown(reason, timeout, unit, promise);
726+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
727+
super.handleShutdown(timeout, unit, promise);
728728
checkLifecycle();
729729
}
730730

@@ -739,15 +739,15 @@ private boolean checkLifecycle() {
739739
}
740740

741741
@Override
742-
protected void writeClose(Object reason, ChannelPromise promise) {
742+
protected void writeClose(ChannelPromise promise) {
743743
// Maybe move to handleShutdown
744744
if (!evicted) {
745745
evicted = true;
746746
if (evictionHandler != null) {
747747
evictionHandler.handle(null);
748748
}
749749
}
750-
super.writeClose(reason, promise);
750+
super.writeClose(promise);
751751
}
752752

753753
private Throwable validateMessage(Object msg) {

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xConnection.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.vertx.core.http.Http2Settings;
2929
import io.vertx.core.http.HttpConnection;
3030
import io.vertx.core.internal.ContextInternal;
31-
import io.vertx.core.net.impl.ConnectionBase;
3231
import io.vertx.core.net.impl.VertxConnection;
3332

3433
import java.util.concurrent.TimeUnit;
@@ -40,7 +39,6 @@ abstract class Http1xConnection extends VertxConnection implements io.vertx.core
4039

4140
protected boolean closeInitiated;
4241
protected boolean shutdownInitiated;
43-
protected Object closeReason;
4442
protected long shutdownTimeout;
4543
protected TimeUnit shutdownUnit;
4644
protected ChannelPromise closePromise;
@@ -61,9 +59,8 @@ public synchronized Http1xConnection shutdownHandler(@Nullable Handler<Void> han
6159
}
6260

6361
@Override
64-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
62+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
6563
shutdownInitiated = true;
66-
closeReason = reason;
6764
shutdownTimeout = timeout;
6865
shutdownUnit = unit;
6966
closePromise = promise;
@@ -77,16 +74,16 @@ protected void handleShutdown(Object reason, long timeout, TimeUnit unit, Channe
7774
}
7875

7976
@Override
80-
protected void writeClose(Object reason, ChannelPromise promise) {
77+
protected void writeClose(ChannelPromise promise) {
8178
closeInitiated = true;
82-
super.writeClose(reason, promise);
79+
super.writeClose(promise);
8380
}
8481

8582
protected void closeInternal() {
8683
if (closeInitiated) {
8784
// Nothing to do
8885
} else if (shutdownInitiated) {
89-
super.handleShutdown(closeReason, shutdownTimeout, shutdownUnit, closePromise);
86+
super.handleShutdown(shutdownTimeout, shutdownUnit, closePromise);
9087
} else {
9188
chctx.channel().close();
9289
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ public Http1xServerConnection(ThreadingModel threadingModel,
107107
}
108108

109109
@Override
110-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
111-
super.handleShutdown(reason, timeout, unit, promise);
110+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
111+
super.handleShutdown(timeout, unit, promise);
112112
if (responseInProgress != null) {
113113
} else {
114114
closeInternal();

vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class ServerWebSocketImpl extends WebSocketImplBase<ServerWebSocketImpl>
2828
private final String query;
2929

3030
ServerWebSocketImpl(ContextInternal context,
31-
VertxConnection conn,
31+
WebSocketConnectionImpl conn,
3232
boolean supportsContinuation,
3333
Http1xServerRequest request,
3434
int maxWebSocketFrameSize,

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketConnectionImpl.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.netty.handler.codec.http.websocketx.*;
1717
import io.netty.util.concurrent.EventExecutor;
1818
import io.netty.util.concurrent.ScheduledFuture;
19+
import io.vertx.core.Future;
1920
import io.vertx.core.http.WebSocketFrameType;
2021
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
2122
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
@@ -77,13 +78,24 @@ public NetworkMetrics metrics() {
7778
return metrics;
7879
}
7980

81+
private Object reason;
82+
83+
public Future<Void> close(Object reason) {
84+
return shutdown(reason, 0L, TimeUnit.SECONDS);
85+
}
86+
87+
public Future<Void> shutdown(Object reason, long timeout, TimeUnit unit) {
88+
this.reason = reason;
89+
return shutdown(timeout, unit);
90+
}
91+
8092
@Override
81-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
93+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
8294
webSocket.handleShutdown();
8395
}
8496

8597
@Override
86-
protected void writeClose(Object reason, ChannelPromise promise) {
98+
protected void writeClose(ChannelPromise promise) {
8799
assert !closeSent;
88100
closeSent = true;
89101
closePromise = promise;
@@ -205,7 +217,7 @@ private void finishClose() {
205217
closingTimeout = null;
206218
ChannelPromise p = closePromise;
207219
closePromise = null;
208-
super.writeClose(closeReason, p);
220+
super.writeClose(p);
209221
}
210222
}
211223

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class WebSocketImpl extends WebSocketImplBase<WebSocketImpl> implements W
2424
private Handler<Void> evictionHandler;
2525

2626
public WebSocketImpl(ContextInternal context,
27-
VertxConnection conn,
27+
WebSocketConnectionImpl conn,
2828
boolean supportsContinuation,
2929
int maxWebSocketFrameSize,
3030
int maxWebSocketMessageSize,

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class WebSocketImplBase<S extends WebSocket> implements WebSocke
5959
private final String binaryHandlerID;
6060
private final int maxWebSocketFrameSize;
6161
private final int maxWebSocketMessageSize;
62-
private final VertxConnection conn;
62+
private final WebSocketConnectionImpl conn;
6363
private ChannelHandlerContext chctx;
6464
protected final ContextInternal context;
6565
private final InboundMessageQueue<WebSocketFrameInternal> pending;
@@ -82,7 +82,7 @@ public abstract class WebSocketImplBase<S extends WebSocket> implements WebSocke
8282
private MultiMap headers;
8383

8484
WebSocketImplBase(ContextInternal context,
85-
VertxConnection conn,
85+
WebSocketConnectionImpl conn,
8686
MultiMap headers,
8787
boolean supportsContinuation,
8888
int maxWebSocketFrameSize,

vertx-core/src/main/java/io/vertx/core/net/impl/SocketBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public synchronized S shutdownHandler(@Nullable Handler<Void> handler) {
251251
}
252252

253253
@Override
254-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
254+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
255255
Handler<Void> handler;
256256
synchronized (this) {
257257
handler = shutdownHandler;

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -134,23 +134,21 @@ protected boolean supportsFileRegion() {
134134
return vertx.transport().supportFileRegion() && !isSsl() &&!isTrafficShaped();
135135
}
136136

137-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
137+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
138138
// Assert from event-loop
139139
ScheduledFuture<?> t = shutdownTimeout;
140140
if (t != null) {
141141
shutdownTimeout = null;
142142
t.cancel(false);
143-
terminateClose(reason, promise);
143+
terminateClose(promise);
144144
}
145145
}
146146

147147
private static class CloseChannelPromise extends DefaultChannelPromise {
148-
final Object reason;
149148
final long timeout;
150149
final TimeUnit unit;
151-
public CloseChannelPromise(Channel channel, Object reason, long timeout, TimeUnit unit) {
150+
public CloseChannelPromise(Channel channel, long timeout, TimeUnit unit) {
152151
super(channel);
153-
this.reason = reason;
154152
this.timeout = timeout;
155153
this.unit = unit;
156154
}
@@ -160,23 +158,12 @@ public CloseChannelPromise(Channel channel, Object reason, long timeout, TimeUni
160158
* Close the connection
161159
*/
162160
public final Future<Void> close() {
163-
return close(null);
164-
}
165-
166-
/**
167-
* Close the connection
168-
*/
169-
public final Future<Void> close(Object reason) {
170-
return shutdown(reason, 0L, TimeUnit.SECONDS);
161+
return shutdown(0L, TimeUnit.SECONDS);
171162
}
172163

173164
// Is this necessary ?
174165
public final Future<Void> shutdown(long timeout, TimeUnit unit) {
175-
return shutdown(null, timeout, unit);
176-
}
177-
178-
public final Future<Void> shutdown(Object reason, long timeout, TimeUnit unit) {
179-
CloseChannelPromise promise = new CloseChannelPromise(channel, reason, timeout, unit);
166+
CloseChannelPromise promise = new CloseChannelPromise(channel, timeout, unit);
180167
shutdown(promise);
181168
PromiseInternal<Void> p = context.promise();
182169
promise.addListener(p);
@@ -198,18 +185,15 @@ private void shutdown(CloseChannelPromise promise) {
198185
final void handleClose(ChannelPromise promise) {
199186
if (closeInitiated != null) {
200187
long timeout;
201-
Object closeReason;
202188
if (promise instanceof CloseChannelPromise) {
203189
timeout = ((CloseChannelPromise)promise).timeout;
204-
closeReason = ((CloseChannelPromise)promise).reason;
205190
} else {
206191
timeout = 0L;
207-
closeReason = null;
208192
}
209193
if (timeout == 0L && !closeSent) {
210194
closeSent = true;
211195
closeInitiated = promise;
212-
writeClose(closeReason, promise);
196+
writeClose(promise);
213197
} else {
214198
channel
215199
.closeFuture()
@@ -223,36 +207,33 @@ final void handleClose(ChannelPromise promise) {
223207
}
224208
} else {
225209
closeInitiated = promise;
226-
Object reason;
227210
long timeout;
228211
TimeUnit unit;
229212
if (promise instanceof CloseChannelPromise) {
230213
CloseChannelPromise closeChannelPromise = (CloseChannelPromise) promise;
231-
reason = closeChannelPromise.reason;
232214
timeout = closeChannelPromise.timeout;
233215
unit = closeChannelPromise.unit;
234216
} else {
235-
reason = null;
236217
timeout = 0L;
237218
unit = TimeUnit.SECONDS;
238219
}
239220
if (timeout == 0L) {
240-
terminateClose(reason, promise);
221+
terminateClose(promise);
241222
} else {
242223
EventExecutor el = chctx.executor();
243224
shutdownTimeout = el.schedule(() -> {
244225
shutdownTimeout = null;
245-
terminateClose(reason, promise);
226+
terminateClose(promise);
246227
}, timeout, unit);
247-
handleShutdown(reason, timeout, unit, promise);
228+
handleShutdown(timeout, unit, promise);
248229
}
249230
}
250231
}
251232

252-
private void terminateClose(Object reason, ChannelPromise promise) {
233+
private void terminateClose(ChannelPromise promise) {
253234
if (!closeSent) {
254235
closeSent = true;
255-
writeClose(reason, promise);
236+
writeClose(promise);
256237
}
257238
}
258239

@@ -265,8 +246,8 @@ private void terminateClose(Object reason, ChannelPromise promise) {
265246
* <p/>
266247
* This method is exclusively called on the event-loop thread and relays a channel user event.
267248
*/
268-
protected void writeClose(Object reason, ChannelPromise promise) {
269-
writeClose(promise);
249+
protected void writeClose(ChannelPromise promise) {
250+
doWriteClose(promise);
270251
}
271252

272253
protected void handleClosed() {
@@ -450,7 +431,7 @@ public final ChannelPromise write(Object msg, boolean forceFlush, ChannelPromise
450431
*
451432
* @param promise the promise receiving the completion event
452433
*/
453-
private void writeClose(ChannelPromise promise) {
434+
private void doWriteClose(ChannelPromise promise) {
454435
// Make sure everything is flushed out on close
455436
ChannelPromise channelPromise = chctx
456437
.newPromise()

vertx-core/src/test/java/io/vertx/tests/net/VertxConnectionTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -559,11 +559,11 @@ protected void handleEvent(Object event) {
559559
}
560560
}
561561
@Override
562-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
562+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
563563
shutdown.set(true);
564564
}
565565
@Override
566-
protected void writeClose(Object reason, ChannelPromise promise) {
566+
protected void writeClose(ChannelPromise promise) {
567567
closed.set(true);
568568
promise.setSuccess();
569569
}
@@ -585,11 +585,11 @@ protected void handleEvent(Object event) {
585585
}
586586
}
587587
@Override
588-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
588+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
589589
shutdown.set(true);
590590
}
591591
@Override
592-
protected void writeClose(Object reason, ChannelPromise promise) {
592+
protected void writeClose(ChannelPromise promise) {
593593
closed.set(true);
594594
}
595595
});
@@ -611,13 +611,13 @@ protected void handleEvent(Object event) {
611611
}
612612
}
613613
@Override
614-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
614+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
615615
shutdown.set(true);
616-
close(reason);
616+
close();
617617
assertTrue(closed.get());
618618
}
619619
@Override
620-
protected void writeClose(Object reason, ChannelPromise promise) {
620+
protected void writeClose(ChannelPromise promise) {
621621
closed.set(true);
622622
}
623623
});
@@ -638,7 +638,7 @@ protected void handleEvent(Object event) {
638638
}
639639
}
640640
@Override
641-
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
641+
protected void handleShutdown(long timeout, TimeUnit unit, ChannelPromise promise) {
642642
shutdown.getAndIncrement();
643643
assertEquals(0L, closed.get());
644644
// Force run tasks at this stage since the task will be cancelled after by embedded channel close
@@ -648,7 +648,7 @@ protected void handleShutdown(Object reason, long timeout, TimeUnit unit, Channe
648648
assertEquals(1L, closed.get());
649649
}
650650
@Override
651-
protected void writeClose(Object reason, ChannelPromise promise) {
651+
protected void writeClose(ChannelPromise promise) {
652652
closed.getAndIncrement();
653653
}
654654
});

0 commit comments

Comments
 (0)