From 74a2dbd99ff78a258e64d2333755b80e0f7c7d99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 19 Oct 2023 15:21:20 +0200 Subject: [PATCH] fix: Emit a failed response for the triggering request on connection failure #3892 (#4322) --- .../engine/http2/client/PersistentConnection.scala | 14 +++++++++++++- .../engine/http2/Http2PersistentClientSpec.scala | 13 +++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala index 73f435e82f..1a7af1e4b1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala @@ -139,7 +139,19 @@ private[http2] object PersistentConnection { requestOut.fail(new StreamTcpException("connection broken")) if (connectsLeft.contains(0)) { - failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause)) + if (isAvailable(requestIn)) { + // fail the triggering request before failing the stream + val request = grab(requestIn) + val response = HttpResponse( + StatusCodes.ServiceUnavailable, + entity = s"Connection failed after $maxAttempts attempts") + .withAttributes(request.attributes) + emit(responseOut, response, () => + failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause)) + ) + } else { + failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause)) + } } else { setHandler(requestIn, Unconnected) if (baseEmbargo == Duration.Zero) { diff --git a/akka-http2-tests/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala b/akka-http2-tests/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala index 4144edae80..7a71d5973b 100644 --- a/akka-http2-tests/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala +++ b/akka-http2-tests/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala @@ -263,16 +263,27 @@ abstract class Http2PersistentClientSpec(tls: Boolean) extends AkkaSpecWithMater ) // need some demand on response side, otherwise, no requests will be pulled in client.responsesIn.request(1) + + def expectFailedResponse(): Unit = { + val response: HttpResponse = client.expectResponse() + response.attribute(requestIdAttr) should be(Some(RequestId("request-1"))) + response.status should be(StatusCodes.ServiceUnavailable) + } + if (withBackoff) { // not immediate when using backoff, 4 retries before failing, backoff is 300-800ms (so at least 1.2s) client.responsesIn.expectNoMessage(clientSettings.http2Settings.baseConnectionBackoff * 4) // total max backoff for 4 tries with the random factor could actually worst case be something // like 600 + 800 + 800 + 800 - those 1200 ms = 1800 ms, but no way to pass a timeout to expectError awaitAssert({ + // failed response then failed stream + expectFailedResponse() client.responsesIn.expectError() }, 1800.millis) } else { // directly + // failed response then failed stream + expectFailedResponse() client.responsesIn.expectError() } client.requestsOut.expectCancellation() @@ -312,6 +323,8 @@ abstract class Http2PersistentClientSpec(tls: Boolean) extends AkkaSpecWithMater probe.expectMsg("saw-reconnect") // max 4 attempts, giving up after that + val error = client.responsesIn.expectNext() // error response + error.status should ===(StatusCodes.ServiceUnavailable) client.responsesIn.expectError() client.requestsOut.expectCancellation() }