From 81cb371665efca4c6ef6c82ba76e820d0b9261ec Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 4 Sep 2025 17:48:36 +1000 Subject: [PATCH 1/8] Issue #13349 - only hard close the websocket connection if abnormal close code Signed-off-by: Lachlan Roberts --- .../websocket/core/WebSocketCoreSession.java | 11 +- .../websocket/tests/TestListenerEndpoint.java | 113 ++++++++++++++++++ .../tests/WebSocketOverHTTP2Test.java | 54 +++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/TestListenerEndpoint.java diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 8091c2768136..5f2a4ae00c18 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -228,7 +228,16 @@ private void closeConnection(CloseStatus closeStatus, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("closeConnection() {} {}", closeStatus, this); - abort(); + if (closeStatus.isAbnormal()) + { + abort(); + } + else + { + connection.cancelDemand(); + connection.getEndPoint().shutdownOutput(); + } + extensionStack.close(); // Forward Errors to Local WebSocket EndPoint diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/TestListenerEndpoint.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/TestListenerEndpoint.java new file mode 100644 index 000000000000..ca241660b0df --- /dev/null +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/TestListenerEndpoint.java @@ -0,0 +1,113 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; + +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.websocket.api.Callback; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestListenerEndpoint implements Session.Listener.AutoDemanding +{ + private static final Logger LOG = LoggerFactory.getLogger(TestListenerEndpoint.class); + + public final BlockingQueue textMessages = new BlockingArrayQueue<>(); + public final BlockingQueue binaryMessages = new BlockingArrayQueue<>(); + public final BlockingQueue pongMessages = new BlockingArrayQueue<>(); + public final BlockingQueue pingMessages = new BlockingArrayQueue<>(); + public final CountDownLatch openLatch = new CountDownLatch(1); + public final CountDownLatch errorLatch = new CountDownLatch(1); + public final CountDownLatch closeLatch = new CountDownLatch(1); + public Session session; + public int closeCode = StatusCode.UNDEFINED; + public String closeReason; + public Throwable error = null; + + @Override + public void onWebSocketOpen(Session session) + { + this.session = session; + if (LOG.isDebugEnabled()) + LOG.debug("{} onOpen(): {}", this, session); + openLatch.countDown(); + } + + @Override + public void onWebSocketText(String message) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketText(): {}", this, message); + textMessages.add(message); + } + + @Override + public void onWebSocketBinary(ByteBuffer payload, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketBinary(): {}", this, BufferUtil.toDetailString(payload)); + binaryMessages.add(BufferUtil.copy(payload)); + callback.succeed(); + } + + @Override + public void onWebSocketPing(ByteBuffer payload) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing(): {}", this, BufferUtil.toDetailString(payload)); + pingMessages.add(BufferUtil.copy(payload)); + session.sendPong(payload, Callback.NOOP); + } + + @Override + public void onWebSocketPong(ByteBuffer payload) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong(): {}", this, BufferUtil.toDetailString(payload)); + pongMessages.add(BufferUtil.copy(payload)); + } + + @Override + public void onWebSocketError(Throwable cause) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", this, cause); + error = cause; + errorLatch.countDown(); + } + + @Override + public void onWebSocketClose(int statusCode, String reason, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose(): {}:{}", this, statusCode, reason); + this.closeCode = statusCode; + this.closeReason = reason; + closeLatch.countDown(); + callback.succeed(); + } + + @Override + public String toString() + { + return String.format("[%s@%x]", TypeUtil.toShortName(getClass()), hashCode()); + } +} diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java index 485f9e03090b..07aa973e97a1 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java @@ -18,7 +18,9 @@ import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -31,6 +33,7 @@ import org.eclipse.jetty.client.HttpRequestException; import org.eclipse.jetty.client.transport.HttpClientConnectionFactory; import org.eclipse.jetty.client.transport.HttpClientTransportDynamic; +import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; @@ -56,6 +59,7 @@ import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.EventsHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.websocket.api.Callback; @@ -502,6 +506,56 @@ public void testNetworkConnectionLimit() throws Exception assertThat(networkConnectionLimit.getNetworkConnectionCount(), equalTo(1)); } + @Test + public void testLargeNumberOfConcurrentConnections() throws Exception + { + Map serverEndpoints = new HashMap<>(); + prepareServer(container -> + container.addMapping("/echo", (rq, rs, cb) -> + { + String clientId = rq.getHeaders().get("clientId"); + if (StringUtil.isEmpty(clientId)) + throw new BadMessageException("Client ID is empty"); + + TestListenerEndpoint serverEndpoint = new TestListenerEndpoint(); + serverEndpoints.put(clientId, serverEndpoint); + return serverEndpoint; + })); + server.start(); + + startClient(clientConnector -> List.of(new ClientConnectionFactoryOverHTTP2.HTTP2(new HTTP2Client(clientConnector)))); + + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/echo"); + List clientHandlers = new ArrayList<>(); + int numConnections = 1000; + for (int i = 0; i < numConnections; i++) + { + TestListenerEndpoint clientEndpoint = new TestListenerEndpoint(); + clientHandlers.add(clientEndpoint); + + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(uri); + upgradeRequest.setHeader("clientId", Integer.toString(i)); + wsClient.connect(clientEndpoint, upgradeRequest).get(5, TimeUnit.SECONDS); + assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.session.getUpgradeRequest().getHttpVersion(), equalTo(HttpVersion.HTTP_2.asString())); + } + + // Close all the websocket connections. + for (int i = 0; i < numConnections; i++) + { + TestListenerEndpoint serverEndpoint = serverEndpoints.get(Integer.toString(i)); + serverEndpoint.session.close(StatusCode.NORMAL, "close initiated from server", Callback.NOOP); + assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverEndpoint.closeCode, equalTo(CloseStatus.NORMAL)); + assertThat(serverEndpoint.closeReason, equalTo("close initiated from server")); + + TestListenerEndpoint clientEndpoint = clientHandlers.get(i); + assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.closeCode, equalTo(CloseStatus.NORMAL)); + assertThat(clientEndpoint.closeReason, equalTo("close initiated from server")); + } + } + private static void awaitConnections(int connections, NetworkConnectionLimit networkConnectionLimit) { await().atMost(1, TimeUnit.SECONDS) From 5eee82de50dd0132856221e044b408848c5d62fd Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 17 Sep 2025 11:18:20 +1000 Subject: [PATCH 2/8] PR #13557 - ensure EndPoint.close() is always called in normal close case Signed-off-by: Lachlan Roberts --- .../websocket/core/WebSocketCoreSession.java | 29 ++++++++++++------- .../websocket/core/internal/FrameFlusher.java | 2 -- .../tests/WebSocketOverHTTP2Test.java | 3 +- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 5f2a4ae00c18..1a89d3f03c00 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -59,6 +59,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable private final Negotiated negotiated; private final Flusher flusher = new Flusher(this); private final ExtensionStack extensionStack; + private final AtomicInteger closeConnection = new AtomicInteger(2); private int maxOutgoingFrames = -1; private final AtomicInteger numOutgoingFrames = new AtomicInteger(); @@ -219,6 +220,8 @@ public void onEof() if (LOG.isDebugEnabled()) LOG.debug("onEof() {}", this); + if (closeConnection.decrementAndGet() == 0) + abort(); if (sessionState.onEof()) closeConnection(sessionState.getCloseStatus(), Callback.NOOP); } @@ -228,15 +231,9 @@ private void closeConnection(CloseStatus closeStatus, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("closeConnection() {} {}", closeStatus, this); + // In the normal case we don't need to abort, the endpoint will be closed once EOF is read, and close frame sent. if (closeStatus.isAbnormal()) - { abort(); - } - else - { - connection.cancelDemand(); - connection.getEndPoint().shutdownOutput(); - } extensionStack.close(); @@ -515,8 +512,20 @@ public void sendFrame(OutgoingEntry entry) if (LOG.isDebugEnabled()) LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); - boolean closeConnection = sessionState.onOutgoingFrame(frame); - if (closeConnection) + // If we are sending a close frame we should always shutdown output after. + // TODO: review this in regards to https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.1 + if (frame.getOpCode() == OpCode.CLOSE) + callback = Callback.from(callback, () -> + { + connection.getEndPoint().shutdownOutput(); + if (closeConnection.decrementAndGet() == 0) + abort(); + }); + + // Here we need to know if we should abort connection. + // We abort if we are OSHUT (send close frame), and we received EOF. + boolean notifyClose = sessionState.onOutgoingFrame(frame); + if (notifyClose) { Callback c = callback; Callback closeConnectionCallback = Callback.from( @@ -674,7 +683,7 @@ public void onFrame(Frame frame, Callback callback) return; } - // Handle inbound CLOSE + // Cancel demand to read to EOF, as we cannot receive any more frames after the CLOSE Frame. connection.cancelDemand(); if (closeConnection) { diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index 6fda274f3ab6..8c8150020151 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -389,8 +389,6 @@ protected void onSuccess() for (FlusherEntry entry : _completedEntries) { - if (entry.getFrame().getOpCode() == OpCode.CLOSE && _behavior == Behavior.SERVER) - _endPoint.shutdownOutput(); notifyCallbackSuccess(entry.getCallback()); } _completedEntries.clear(); diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java index 07aa973e97a1..8e668d683d25 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketOverHTTP2Test.java @@ -544,12 +544,13 @@ public void testLargeNumberOfConcurrentConnections() throws Exception for (int i = 0; i < numConnections; i++) { TestListenerEndpoint serverEndpoint = serverEndpoints.get(Integer.toString(i)); + TestListenerEndpoint clientEndpoint = clientHandlers.get(i); + serverEndpoint.session.close(StatusCode.NORMAL, "close initiated from server", Callback.NOOP); assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); assertThat(serverEndpoint.closeCode, equalTo(CloseStatus.NORMAL)); assertThat(serverEndpoint.closeReason, equalTo("close initiated from server")); - TestListenerEndpoint clientEndpoint = clientHandlers.get(i); assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); assertThat(clientEndpoint.closeCode, equalTo(CloseStatus.NORMAL)); assertThat(clientEndpoint.closeReason, equalTo("close initiated from server")); From c44eaaa9893c645ed51edfc46526cd6cd956f72c Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 19 Sep 2025 13:26:25 +1000 Subject: [PATCH 3/8] PR #13557 - integrate close endpoint logic into WebSocketSessionState Signed-off-by: Lachlan Roberts --- .../websocket/core/WebSocketCoreSession.java | 54 ++-- .../core/internal/WebSocketSessionState.java | 240 ++++++++++++------ 2 files changed, 204 insertions(+), 90 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 1a89d3f03c00..04a4f1f865d9 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -54,12 +54,11 @@ public class WebSocketCoreSession implements CoreSession, Dumpable private final WebSocketComponents components; private final Behavior behavior; - private final WebSocketSessionState sessionState = new WebSocketSessionState(); + private final WebSocketSessionState sessionState; private final FrameHandler handler; private final Negotiated negotiated; private final Flusher flusher = new Flusher(this); private final ExtensionStack extensionStack; - private final AtomicInteger closeConnection = new AtomicInteger(2); private int maxOutgoingFrames = -1; private final AtomicInteger numOutgoingFrames = new AtomicInteger(); @@ -77,6 +76,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable public WebSocketCoreSession(FrameHandler handler, Behavior behavior, Negotiated negotiated, WebSocketComponents components) { + this.sessionState = new WebSocketSessionState(behavior); this.classLoader = Thread.currentThread().getContextClassLoader(); this.components = components; this.handler = handler; @@ -220,9 +220,10 @@ public void onEof() if (LOG.isDebugEnabled()) LOG.debug("onEof() {}", this); - if (closeConnection.decrementAndGet() == 0) + WebSocketSessionState.BooleanPair result = sessionState.onEof(); + if (result.closeEndpoint()) abort(); - if (sessionState.onEof()) + if (result.notifyWebSocketClose()) closeConnection(sessionState.getCloseStatus(), Callback.NOOP); } @@ -341,7 +342,11 @@ private void processError(CloseStatus closeStatus, Callback callback) } else { - if (sessionState.onClosed(closeStatus)) + WebSocketSessionState.BooleanPair result = sessionState.onClosed(closeStatus); + if (result.closeEndpoint()) + abort(); + + if (result.notifyWebSocketClose()) { closeConnection(closeStatus, callback); } @@ -513,19 +518,23 @@ public void sendFrame(OutgoingEntry entry) LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); // If we are sending a close frame we should always shutdown output after. - // TODO: review this in regards to https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.1 if (frame.getOpCode() == OpCode.CLOSE) callback = Callback.from(callback, () -> { - connection.getEndPoint().shutdownOutput(); - if (closeConnection.decrementAndGet() == 0) - abort(); + // Server is the one to initiate the TCP close (see RFC6455 7.1.1). + if (behavior == Behavior.SERVER) + { + connection.getEndPoint().shutdownOutput(); + if (sessionState.onShutdownOutput()) + abort(); + } }); - // Here we need to know if we should abort connection. - // We abort if we are OSHUT (send close frame), and we received EOF. - boolean notifyClose = sessionState.onOutgoingFrame(frame); - if (notifyClose) + WebSocketSessionState.BooleanPair result = sessionState.onOutgoingFrame(frame); + if (result.closeEndpoint()) + abort(); + + if (result.notifyWebSocketClose()) { Callback c = callback; Callback closeConnectionCallback = Callback.from( @@ -551,8 +560,17 @@ public void sendFrame(OutgoingEntry entry) if (frame.getOpCode() == OpCode.CLOSE) { CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); - if (closeStatus.isAbnormal() && sessionState.onClosed(closeStatus)) - closeConnection(closeStatus, Callback.from(callback, t)); + if (closeStatus.isAbnormal()) + { + WebSocketSessionState.BooleanPair result = sessionState.onClosed(closeStatus); + if (result.closeEndpoint()) + abort(); + + if (result.notifyWebSocketClose()) + closeConnection(closeStatus, Callback.from(callback, t)); + else + callback.failed(t); + } else callback.failed(t); } @@ -674,7 +692,9 @@ public void onFrame(Frame frame, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("receiveFrame({}, {}) - connectionState={}, handler={}", frame, callback, sessionState, handler); - boolean closeConnection = sessionState.onIncomingFrame(frame); + WebSocketSessionState.BooleanPair result = sessionState.onIncomingFrame(frame); + if (result.closeEndpoint()) + abort(); // Handle inbound frame if (frame.getOpCode() != OpCode.CLOSE) @@ -685,7 +705,7 @@ public void onFrame(Frame frame, Callback callback) // Cancel demand to read to EOF, as we cannot receive any more frames after the CLOSE Frame. connection.cancelDemand(); - if (closeConnection) + if (result.notifyWebSocketClose()) { closeCallback = Callback.from(() -> closeConnection(sessionState.getCloseStatus(), callback), t -> { diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java index cc42e1dd6258..c1d3fd86a183 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java @@ -17,6 +17,7 @@ import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.OpCode; @@ -27,7 +28,7 @@ */ public class WebSocketSessionState { - enum State + enum WebSocketState { CONNECTING, CONNECTED, @@ -37,31 +38,48 @@ enum State CLOSED } - private final AutoLock lock = new AutoLock(); - private State _sessionState = State.CONNECTING; + enum EndPointState + { + OPEN, + ISHUT, + OSHUT, + CLOSED + } + + private final AutoLock _lock = new AutoLock(); + private final Behavior _behavior; + private WebSocketState _webSocketState = WebSocketState.CONNECTING; + private EndPointState _endPointState = EndPointState.OPEN; private byte _incomingContinuation = OpCode.UNDEFINED; private byte _outgoingContinuation = OpCode.UNDEFINED; CloseStatus _closeStatus = null; + public WebSocketSessionState(Behavior behavior) + { + _behavior = behavior; + } + + public record BooleanPair(boolean notifyWebSocketClose, boolean closeEndpoint) {} + public void onConnected() { - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { - if (_sessionState != State.CONNECTING) - throw new IllegalStateException(_sessionState.toString()); + if (_webSocketState != WebSocketState.CONNECTING) + throw new IllegalStateException(_webSocketState.toString()); - _sessionState = State.CONNECTED; + _webSocketState = WebSocketState.CONNECTED; } } public void onOpen() { - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { - switch (_sessionState) + switch (_webSocketState) { case CONNECTED: - _sessionState = State.OPEN; + _webSocketState = WebSocketState.OPEN; break; case OSHUT: @@ -70,57 +88,44 @@ public void onOpen() break; default: - throw new IllegalStateException(_sessionState.toString()); + throw new IllegalStateException(_webSocketState.toString()); } } } - private State getState() + private WebSocketState getState() { - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { - return _sessionState; + return _webSocketState; } } public boolean isClosed() { - return getState() == State.CLOSED; + return getState() == WebSocketState.CLOSED; } public boolean isInputOpen() { - State state = getState(); - return (state == State.OPEN || state == State.OSHUT); + WebSocketState state = getState(); + return (state == WebSocketState.OPEN || state == WebSocketState.OSHUT); } public boolean isOutputOpen() { - State state = getState(); - return (state == State.CONNECTED || state == State.OPEN || state == State.ISHUT); + WebSocketState state = getState(); + return (state == WebSocketState.CONNECTED || state == WebSocketState.OPEN || state == WebSocketState.ISHUT); } public CloseStatus getCloseStatus() { - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { return _closeStatus; } } - public boolean onClosed(CloseStatus closeStatus) - { - try (AutoLock l = lock.lock()) - { - if (_sessionState == State.CLOSED) - return false; - - _closeStatus = closeStatus; - _sessionState = State.CLOSED; - return true; - } - } - /** *

* If no error is set in the CloseStatus this will either, replace the current close status with @@ -139,9 +144,9 @@ public boolean onClosed(CloseStatus closeStatus) */ public void onError(Throwable t) { - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { - if (_sessionState != State.CLOSED || _closeStatus == null) + if (_webSocketState != WebSocketState.CLOSED || _closeStatus == null) throw new IllegalArgumentException(); // Override any normal close status. @@ -154,31 +159,103 @@ public void onError(Throwable t) } } - public boolean onEof() + public BooleanPair onClosed(CloseStatus closeStatus) { - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { - switch (_sessionState) + boolean notifyWebSocketClose = false; + if (_webSocketState != WebSocketState.CLOSED) { - case CLOSED: - case ISHUT: - return false; + _closeStatus = closeStatus; + _webSocketState = WebSocketState.CLOSED; + notifyWebSocketClose = true; + } - default: + boolean closeEndpoint = false; + if (_endPointState != EndPointState.CLOSED) + { + _endPointState = EndPointState.CLOSED; + closeEndpoint = true; + } + + return new BooleanPair(notifyWebSocketClose, closeEndpoint); + } + } + + /** + * Handle an EOF from the transport. + * @return a pair of booleans; + * The first indicates whether the websocket listeners should be notified of close. + * The second indicates whether the underlying endpoint should be closed. + */ + public BooleanPair onEof() + { + try (AutoLock l = _lock.lock()) + { + return switch (_webSocketState) + { + case CLOSED -> + { + boolean closeEndpoint = lockedForceCloseEndpointState(); + yield new BooleanPair(false, closeEndpoint); + } + case ISHUT -> + { + boolean closeEndpoint = false; + switch (_endPointState) + { + case OPEN -> _endPointState = EndPointState.ISHUT; + case CLOSED, ISHUT -> + { /* NOOP */ } + case OSHUT -> + { + closeEndpoint = true; + _endPointState = EndPointState.CLOSED; + } + default -> throw new IllegalStateException(_endPointState.toString()); + } + yield new BooleanPair(false, closeEndpoint); + } + default -> + { if (_closeStatus == null || CloseStatus.isOrdinary(_closeStatus.getCode())) _closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, "Session Closed", new ClosedChannelException()); - _sessionState = State.CLOSED; - return true; - } + _webSocketState = WebSocketState.CLOSED; + + boolean closeEndpoint = lockedForceCloseEndpointState(); + yield new BooleanPair(true, closeEndpoint); + } + }; + } + } + + public boolean onShutdownOutput() + { + try (AutoLock l = _lock.lock()) + { + return switch (_endPointState) + { + case OPEN -> + { + _endPointState = EndPointState.OSHUT; + yield false; + } + case ISHUT -> + { + _endPointState = EndPointState.CLOSED; + yield true; + } + case OSHUT, CLOSED -> false; + }; } } - public boolean onOutgoingFrame(Frame frame) throws Exception + public BooleanPair onOutgoingFrame(Frame frame) throws Exception { byte opcode = frame.getOpCode(); boolean fin = frame.isFin(); - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { if (!isOutputOpen()) throw new ClosedChannelException(); @@ -188,24 +265,25 @@ public boolean onOutgoingFrame(Frame frame) throws Exception _closeStatus = CloseStatus.getCloseStatus(frame); if (_closeStatus.isAbnormal()) { - _sessionState = State.CLOSED; - return true; + boolean closeEndpoint = lockedForceCloseEndpointState(); + _webSocketState = WebSocketState.CLOSED; + return new BooleanPair(true, closeEndpoint); } - switch (_sessionState) + return switch (_webSocketState) { - case CONNECTED: - case OPEN: - _sessionState = State.OSHUT; - return false; - - case ISHUT: - _sessionState = State.CLOSED; - return true; - - default: - throw new IllegalStateException(_sessionState.toString()); - } + case CONNECTED, OPEN -> + { + _webSocketState = WebSocketState.OSHUT; + yield new BooleanPair(false, false); + } + case ISHUT -> + { + _webSocketState = WebSocketState.CLOSED; + yield new BooleanPair(true, false); + } + default -> throw new IllegalStateException(_webSocketState.toString()); + }; } else if (frame.isDataFrame()) { @@ -213,15 +291,15 @@ else if (frame.isDataFrame()) } } - return false; + return new BooleanPair(false, false); } - public boolean onIncomingFrame(Frame frame) throws ProtocolException, ClosedChannelException + public BooleanPair onIncomingFrame(Frame frame) throws ProtocolException, ClosedChannelException { byte opcode = frame.getOpCode(); boolean fin = frame.isFin(); - try (AutoLock l = lock.lock()) + try (AutoLock l = _lock.lock()) { if (!isInputOpen()) throw new ClosedChannelException(); @@ -230,16 +308,19 @@ public boolean onIncomingFrame(Frame frame) throws ProtocolException, ClosedChan { _closeStatus = CloseStatus.getCloseStatus(frame); - switch (_sessionState) + switch (_webSocketState) { case OPEN: - _sessionState = State.ISHUT; - return false; + _webSocketState = WebSocketState.ISHUT; + return new BooleanPair(false, false); case OSHUT: - _sessionState = State.CLOSED; - return true; + // If we received abnormal status close, and we cannot send a response because we are OSHUT, + // so we should close underlying the connection. + boolean closeEndpoint = _closeStatus.isAbnormal() && lockedForceCloseEndpointState(); + _webSocketState = WebSocketState.CLOSED; + return new BooleanPair(true, closeEndpoint); default: - throw new IllegalStateException(_sessionState.toString()); + throw new IllegalStateException(_webSocketState.toString()); } } else if (frame.isDataFrame()) @@ -248,19 +329,32 @@ else if (frame.isDataFrame()) } } - return false; + return new BooleanPair(false, false); } @Override public String toString() { return String.format("%s@%x{%s,i=%s,o=%s,c=%s}", TypeUtil.toShortName(getClass()), hashCode(), - _sessionState, + _webSocketState, OpCode.name(_incomingContinuation), OpCode.name(_outgoingContinuation), _closeStatus); } + private boolean lockedForceCloseEndpointState() + { + assert _lock.isHeldByCurrentThread(); + + boolean closeEndpoint = false; + if (_endPointState != EndPointState.CLOSED) + { + _endPointState = EndPointState.CLOSED; + closeEndpoint = true; + } + return closeEndpoint; + } + private static byte checkDataSequence(byte opcode, boolean fin, byte lastOpCode) throws ProtocolException { switch (opcode) From c40f7a04cd9e3f7198d610b7a576a8fa07fa5413 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 19 Sep 2025 15:14:57 +1000 Subject: [PATCH 4/8] PR #13557 - initiate tcp close of the websocket connection from Server Signed-off-by: Lachlan Roberts --- .../websocket/core/WebSocketCoreSession.java | 49 +++++++++---------- .../core/internal/WebSocketSessionState.java | 23 ++++++--- .../tests/server/ServerConfigTest.java | 3 +- 3 files changed, 39 insertions(+), 36 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 04a4f1f865d9..a6b185fba176 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -220,22 +220,20 @@ public void onEof() if (LOG.isDebugEnabled()) LOG.debug("onEof() {}", this); - WebSocketSessionState.BooleanPair result = sessionState.onEof(); + WebSocketSessionState.EofResult result = sessionState.onEof(); + if (result.shutdownOutput()) + getConnection().getEndPoint().shutdownOutput(); if (result.closeEndpoint()) abort(); if (result.notifyWebSocketClose()) - closeConnection(sessionState.getCloseStatus(), Callback.NOOP); + notifyWebSocketConnectionClose(sessionState.getCloseStatus(), Callback.NOOP); } - private void closeConnection(CloseStatus closeStatus, Callback callback) + private void notifyWebSocketConnectionClose(CloseStatus closeStatus, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("closeConnection() {} {}", closeStatus, this); - // In the normal case we don't need to abort, the endpoint will be closed once EOF is read, and close frame sent. - if (closeStatus.isAbnormal()) - abort(); - extensionStack.close(); // Forward Errors to Local WebSocket EndPoint @@ -348,7 +346,7 @@ private void processError(CloseStatus closeStatus, Callback callback) if (result.notifyWebSocketClose()) { - closeConnection(closeStatus, callback); + notifyWebSocketConnectionClose(closeStatus, callback); } else { @@ -517,29 +515,28 @@ public void sendFrame(OutgoingEntry entry) if (LOG.isDebugEnabled()) LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); - // If we are sending a close frame we should always shutdown output after. - if (frame.getOpCode() == OpCode.CLOSE) - callback = Callback.from(callback, () -> + WebSocketSessionState.BooleanPair result = sessionState.onOutgoingFrame(frame); + callback = Callback.from(callback, () -> + { + if (frame.getOpCode() == OpCode.CLOSE) { - // Server is the one to initiate the TCP close (see RFC6455 7.1.1). - if (behavior == Behavior.SERVER) - { + WebSocketSessionState.CloseResult closeResult = sessionState.onCloseFrameSent(); + if (closeResult.shutdownOutput()) connection.getEndPoint().shutdownOutput(); - if (sessionState.onShutdownOutput()) - abort(); - } - }); + if (closeResult.closeEndpoint()) + abort(); + } - WebSocketSessionState.BooleanPair result = sessionState.onOutgoingFrame(frame); - if (result.closeEndpoint()) - abort(); + if (result.closeEndpoint()) + abort(); + }); if (result.notifyWebSocketClose()) { Callback c = callback; Callback closeConnectionCallback = Callback.from( - () -> closeConnection(sessionState.getCloseStatus(), c), - t -> closeConnection(sessionState.getCloseStatus(), Callback.from(c, t))); + () -> notifyWebSocketConnectionClose(sessionState.getCloseStatus(), c), + t -> notifyWebSocketConnectionClose(sessionState.getCloseStatus(), Callback.from(c, t))); flusher.sendFrame(new OutgoingEntry.Builder(entry) .callback(closeConnectionCallback) @@ -567,7 +564,7 @@ public void sendFrame(OutgoingEntry entry) abort(); if (result.notifyWebSocketClose()) - closeConnection(closeStatus, Callback.from(callback, t)); + notifyWebSocketConnectionClose(closeStatus, Callback.from(callback, t)); else callback.failed(t); } @@ -707,10 +704,10 @@ public void onFrame(Frame frame, Callback callback) connection.cancelDemand(); if (result.notifyWebSocketClose()) { - closeCallback = Callback.from(() -> closeConnection(sessionState.getCloseStatus(), callback), t -> + closeCallback = Callback.from(() -> notifyWebSocketConnectionClose(sessionState.getCloseStatus(), callback), t -> { sessionState.onError(t); - closeConnection(sessionState.getCloseStatus(), callback); + notifyWebSocketConnectionClose(sessionState.getCloseStatus(), callback); }); } else diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java index c1d3fd86a183..c8c30d3e48a7 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java @@ -182,13 +182,14 @@ public BooleanPair onClosed(CloseStatus closeStatus) } } + public record EofResult(boolean notifyWebSocketClose, boolean closeEndpoint, boolean shutdownOutput){} /** * Handle an EOF from the transport. * @return a pair of booleans; * The first indicates whether the websocket listeners should be notified of close. * The second indicates whether the underlying endpoint should be closed. */ - public BooleanPair onEof() + public EofResult onEof() { try (AutoLock l = _lock.lock()) { @@ -197,11 +198,12 @@ public BooleanPair onEof() case CLOSED -> { boolean closeEndpoint = lockedForceCloseEndpointState(); - yield new BooleanPair(false, closeEndpoint); + yield new EofResult(false, closeEndpoint, false); } case ISHUT -> { boolean closeEndpoint = false; + boolean shutdownOutput = false; switch (_endPointState) { case OPEN -> _endPointState = EndPointState.ISHUT; @@ -209,12 +211,13 @@ public BooleanPair onEof() { /* NOOP */ } case OSHUT -> { + shutdownOutput = _behavior == Behavior.CLIENT; closeEndpoint = true; _endPointState = EndPointState.CLOSED; } default -> throw new IllegalStateException(_endPointState.toString()); } - yield new BooleanPair(false, closeEndpoint); + yield new EofResult(false, closeEndpoint, shutdownOutput); } default -> { @@ -223,13 +226,14 @@ public BooleanPair onEof() _webSocketState = WebSocketState.CLOSED; boolean closeEndpoint = lockedForceCloseEndpointState(); - yield new BooleanPair(true, closeEndpoint); + yield new EofResult(true, closeEndpoint, false); } }; } } - public boolean onShutdownOutput() + public record CloseResult(boolean shutdownOutput, boolean closeEndpoint){} + public CloseResult onCloseFrameSent() { try (AutoLock l = _lock.lock()) { @@ -238,14 +242,17 @@ public boolean onShutdownOutput() case OPEN -> { _endPointState = EndPointState.OSHUT; - yield false; + // We only shut down output if we are a server because of RFC6455 7.1.1. + // When the client receives an EOF it will shut down its output. + yield new CloseResult(_behavior == Behavior.SERVER, false); } case ISHUT -> { + // We have already read EOF so we can shut down output even if we're a client. _endPointState = EndPointState.CLOSED; - yield true; + yield new CloseResult(true, true); } - case OSHUT, CLOSED -> false; + case OSHUT, CLOSED -> new CloseResult(false, false); }; } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerConfigTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerConfigTest.java index 9e7b0a4cb7c3..7d270ffdc419 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerConfigTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerConfigTest.java @@ -244,9 +244,8 @@ public void testIdleTimeout(String path) throws Exception connect.get(5, TimeUnit.SECONDS); clientEndpoint.session.sendText("hello world", Callback.NOOP); - String msg = serverEndpoint.textMessages.poll(500, TimeUnit.MILLISECONDS); + String msg = serverEndpoint.textMessages.poll(5, TimeUnit.MILLISECONDS); assertThat(msg, is("hello world")); - Thread.sleep(IDLE_TIMEOUT + 500); assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); assertThat(serverEndpoint.error, instanceOf(WebSocketTimeoutException.class)); From 12ff325eed653351fe1f8dbc46dbf6d025dc6b3c Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 19 Sep 2025 16:09:42 +1000 Subject: [PATCH 5/8] PR #13557 - fix bug in JettyWebSocketFrameHandler The OnWebSocketClose notification can be called in two cases, if a close frame is received, or if the connection is notified as closed through FrameHandler.onClosed. The callback should only be succeeded not failed if this is called twice, because this is a normal case. Received a response close frame and OnWebSocketClose is called through JettyWebSocketFrameHandler.onFrame. Then the connection is closed so JettyWebSocketFrameHandler.onClosed is called, but we cannot call OnWebSocketClose again. Signed-off-by: Lachlan Roberts --- .../jetty/websocket/core/internal/WebSocketSessionState.java | 2 +- .../jetty/websocket/common/JettyWebSocketFrameHandler.java | 3 +-- .../jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java index c8c30d3e48a7..64e114bc29e2 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java @@ -134,7 +134,7 @@ public CloseStatus getCloseStatus() *

*

* This should only be called if there is an error directly before the call to - * {@code WebSocketCoreSession.closeConnection(CloseStatus, Callback)}. + * {@code WebSocketCoreSession#notifyWebSocketConnectionClose(CloseStatus, Callback)}. *

*

* This could occur if the FrameHandler throws an exception in onFrame after receiving a close frame reply, in this diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 0c890ba4faa7..09a0b6b3db5d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -18,7 +18,6 @@ import java.lang.invoke.MethodType; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.BufferUtil; @@ -270,7 +269,7 @@ private void notifyOnClose(CloseStatus closeStatus, Callback callback) // Make sure onClose is only notified once. if (!closeNotified.compareAndSet(false, true)) { - callback.failed(new ClosedChannelException()); + callback.succeeded(); return; } diff --git a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java index 8e501144755b..0e5fb8fc0698 100644 --- a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java @@ -14,7 +14,6 @@ package org.eclipse.jetty.ee9.websocket.common; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -296,7 +295,7 @@ private void notifyOnClose(CloseStatus closeStatus, Callback callback) // Make sure onClose is only notified once. if (!closeNotified.compareAndSet(false, true)) { - callback.failed(new ClosedChannelException()); + callback.succeeded(); return; } From cbbab53644cb1c4b1922ae39af700e70f5bab75f Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 19 Sep 2025 17:18:06 +1000 Subject: [PATCH 6/8] PR #13557 - ensure endpoint is closed after sessionState.onError(t) is called Signed-off-by: Lachlan Roberts --- .../jetty/websocket/core/WebSocketCoreSession.java | 3 ++- .../core/internal/WebSocketSessionState.java | 13 +++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index a6b185fba176..89da4bbe62f2 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -706,7 +706,8 @@ public void onFrame(Frame frame, Callback callback) { closeCallback = Callback.from(() -> notifyWebSocketConnectionClose(sessionState.getCloseStatus(), callback), t -> { - sessionState.onError(t); + if (sessionState.onError(t)) + abort(); notifyWebSocketConnectionClose(sessionState.getCloseStatus(), callback); }); } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java index 64e114bc29e2..924964f79020 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java @@ -141,8 +141,9 @@ public CloseStatus getCloseStatus() * case to notify onError we must set the cause in the closeStatus. *

* @param t the error which occurred. + * @return true if the endpoint should be closed. */ - public void onError(Throwable t) + public boolean onError(Throwable t) { try (AutoLock l = _lock.lock()) { @@ -156,6 +157,8 @@ public void onError(Throwable t) // Otherwise set the error if it wasn't already set to notify onError as well as onClose. if (_closeStatus.getCause() == null) _closeStatus = new CloseStatus(_closeStatus.getCode(), _closeStatus.getReason(), t); + + return lockedForceCloseEndpointState(); } } @@ -163,6 +166,7 @@ public BooleanPair onClosed(CloseStatus closeStatus) { try (AutoLock l = _lock.lock()) { + boolean closeEndpoint = lockedForceCloseEndpointState(); boolean notifyWebSocketClose = false; if (_webSocketState != WebSocketState.CLOSED) { @@ -171,13 +175,6 @@ public BooleanPair onClosed(CloseStatus closeStatus) notifyWebSocketClose = true; } - boolean closeEndpoint = false; - if (_endPointState != EndPointState.CLOSED) - { - _endPointState = EndPointState.CLOSED; - closeEndpoint = true; - } - return new BooleanPair(notifyWebSocketClose, closeEndpoint); } } From 301dc800fc2878e74926c25c62cda64b8c626013 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 19 Sep 2025 17:32:49 +1000 Subject: [PATCH 7/8] PR #13557 - cleanup in WebSocketSessionState Signed-off-by: Lachlan Roberts --- .../websocket/core/WebSocketCoreSession.java | 8 ++--- .../core/internal/WebSocketSessionState.java | 30 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 89da4bbe62f2..78f8567e96b6 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -340,7 +340,7 @@ private void processError(CloseStatus closeStatus, Callback callback) } else { - WebSocketSessionState.BooleanPair result = sessionState.onClosed(closeStatus); + WebSocketSessionState.Result result = sessionState.onClosed(closeStatus); if (result.closeEndpoint()) abort(); @@ -515,7 +515,7 @@ public void sendFrame(OutgoingEntry entry) if (LOG.isDebugEnabled()) LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); - WebSocketSessionState.BooleanPair result = sessionState.onOutgoingFrame(frame); + WebSocketSessionState.Result result = sessionState.onOutgoingFrame(frame); callback = Callback.from(callback, () -> { if (frame.getOpCode() == OpCode.CLOSE) @@ -559,7 +559,7 @@ public void sendFrame(OutgoingEntry entry) CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); if (closeStatus.isAbnormal()) { - WebSocketSessionState.BooleanPair result = sessionState.onClosed(closeStatus); + WebSocketSessionState.Result result = sessionState.onClosed(closeStatus); if (result.closeEndpoint()) abort(); @@ -689,7 +689,7 @@ public void onFrame(Frame frame, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("receiveFrame({}, {}) - connectionState={}, handler={}", frame, callback, sessionState, handler); - WebSocketSessionState.BooleanPair result = sessionState.onIncomingFrame(frame); + WebSocketSessionState.Result result = sessionState.onIncomingFrame(frame); if (result.closeEndpoint()) abort(); diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java index 924964f79020..a4601bc98324 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java @@ -46,6 +46,10 @@ enum EndPointState CLOSED } + public record Result(boolean notifyWebSocketClose, boolean closeEndpoint) {} + public record EofResult(boolean notifyWebSocketClose, boolean closeEndpoint, boolean shutdownOutput){} + public record CloseResult(boolean shutdownOutput, boolean closeEndpoint){} + private final AutoLock _lock = new AutoLock(); private final Behavior _behavior; private WebSocketState _webSocketState = WebSocketState.CONNECTING; @@ -59,8 +63,6 @@ public WebSocketSessionState(Behavior behavior) _behavior = behavior; } - public record BooleanPair(boolean notifyWebSocketClose, boolean closeEndpoint) {} - public void onConnected() { try (AutoLock l = _lock.lock()) @@ -162,7 +164,7 @@ public boolean onError(Throwable t) } } - public BooleanPair onClosed(CloseStatus closeStatus) + public Result onClosed(CloseStatus closeStatus) { try (AutoLock l = _lock.lock()) { @@ -175,11 +177,10 @@ public BooleanPair onClosed(CloseStatus closeStatus) notifyWebSocketClose = true; } - return new BooleanPair(notifyWebSocketClose, closeEndpoint); + return new Result(notifyWebSocketClose, closeEndpoint); } } - public record EofResult(boolean notifyWebSocketClose, boolean closeEndpoint, boolean shutdownOutput){} /** * Handle an EOF from the transport. * @return a pair of booleans; @@ -229,7 +230,6 @@ public EofResult onEof() } } - public record CloseResult(boolean shutdownOutput, boolean closeEndpoint){} public CloseResult onCloseFrameSent() { try (AutoLock l = _lock.lock()) @@ -254,7 +254,7 @@ public CloseResult onCloseFrameSent() } } - public BooleanPair onOutgoingFrame(Frame frame) throws Exception + public Result onOutgoingFrame(Frame frame) throws Exception { byte opcode = frame.getOpCode(); boolean fin = frame.isFin(); @@ -271,7 +271,7 @@ public BooleanPair onOutgoingFrame(Frame frame) throws Exception { boolean closeEndpoint = lockedForceCloseEndpointState(); _webSocketState = WebSocketState.CLOSED; - return new BooleanPair(true, closeEndpoint); + return new Result(true, closeEndpoint); } return switch (_webSocketState) @@ -279,12 +279,12 @@ public BooleanPair onOutgoingFrame(Frame frame) throws Exception case CONNECTED, OPEN -> { _webSocketState = WebSocketState.OSHUT; - yield new BooleanPair(false, false); + yield new Result(false, false); } case ISHUT -> { _webSocketState = WebSocketState.CLOSED; - yield new BooleanPair(true, false); + yield new Result(true, false); } default -> throw new IllegalStateException(_webSocketState.toString()); }; @@ -295,10 +295,10 @@ else if (frame.isDataFrame()) } } - return new BooleanPair(false, false); + return new Result(false, false); } - public BooleanPair onIncomingFrame(Frame frame) throws ProtocolException, ClosedChannelException + public Result onIncomingFrame(Frame frame) throws ProtocolException, ClosedChannelException { byte opcode = frame.getOpCode(); boolean fin = frame.isFin(); @@ -316,13 +316,13 @@ public BooleanPair onIncomingFrame(Frame frame) throws ProtocolException, Closed { case OPEN: _webSocketState = WebSocketState.ISHUT; - return new BooleanPair(false, false); + return new Result(false, false); case OSHUT: // If we received abnormal status close, and we cannot send a response because we are OSHUT, // so we should close underlying the connection. boolean closeEndpoint = _closeStatus.isAbnormal() && lockedForceCloseEndpointState(); _webSocketState = WebSocketState.CLOSED; - return new BooleanPair(true, closeEndpoint); + return new Result(true, closeEndpoint); default: throw new IllegalStateException(_webSocketState.toString()); } @@ -333,7 +333,7 @@ else if (frame.isDataFrame()) } } - return new BooleanPair(false, false); + return new Result(false, false); } @Override From b2afba4cb89d5a9b046dc10c05dfc6c3e6a1730f Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 19 Sep 2025 17:58:43 +1000 Subject: [PATCH 8/8] PR #13557 - handle case when close frame fails to be sent Signed-off-by: Lachlan Roberts --- .../jetty/websocket/core/WebSocketCoreSession.java | 13 ++++++++++++- .../core/internal/WebSocketSessionState.java | 2 ++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 78f8567e96b6..fe6e6d74a9f6 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -516,8 +516,19 @@ public void sendFrame(OutgoingEntry entry) LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); WebSocketSessionState.Result result = sessionState.onOutgoingFrame(frame); - callback = Callback.from(callback, () -> + callback = Callback.from(callback, failure -> { + if (failure != null) + { + CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, failure); + WebSocketSessionState.Result closeResult = sessionState.onClosed(closeStatus); + if (closeResult.closeEndpoint()) + abort(); + if (closeResult.notifyWebSocketClose()) + notifyWebSocketConnectionClose(closeStatus, NOOP); + return; + } + if (frame.getOpCode() == OpCode.CLOSE) { WebSocketSessionState.CloseResult closeResult = sessionState.onCloseFrameSent(); diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java index a4601bc98324..848d801bc010 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketSessionState.java @@ -209,6 +209,8 @@ public EofResult onEof() { /* NOOP */ } case OSHUT -> { + // If this was a client it didn't shut down output when it sent the close frame because of RFC6455 7.1.1. + // So we should do the shutdown output before closing the endpoint. shutdownOutput = _behavior == Behavior.CLIENT; closeEndpoint = true; _endPointState = EndPointState.CLOSED;