Skip to content

Commit

Permalink
fix: Emit a failed response for the triggering request on connection …
Browse files Browse the repository at this point in the history
…failure #3892 (#4322)
  • Loading branch information
johanandren authored Oct 19, 2023
1 parent 1f29fe6 commit 74a2dbd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,19 @@ private[http2] object PersistentConnection {
requestOut.fail(new StreamTcpException("connection broken"))

if (connectsLeft.contains(0)) {
failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause))
if (isAvailable(requestIn)) {
// fail the triggering request before failing the stream
val request = grab(requestIn)
val response = HttpResponse(
StatusCodes.ServiceUnavailable,
entity = s"Connection failed after $maxAttempts attempts")
.withAttributes(request.attributes)
emit(responseOut, response, () =>
failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause))
)
} else {
failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause))
}
} else {
setHandler(requestIn, Unconnected)
if (baseEmbargo == Duration.Zero) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,27 @@ abstract class Http2PersistentClientSpec(tls: Boolean) extends AkkaSpecWithMater
)
// need some demand on response side, otherwise, no requests will be pulled in
client.responsesIn.request(1)

def expectFailedResponse(): Unit = {
val response: HttpResponse = client.expectResponse()
response.attribute(requestIdAttr) should be(Some(RequestId("request-1")))
response.status should be(StatusCodes.ServiceUnavailable)
}

if (withBackoff) {
// not immediate when using backoff, 4 retries before failing, backoff is 300-800ms (so at least 1.2s)
client.responsesIn.expectNoMessage(clientSettings.http2Settings.baseConnectionBackoff * 4)
// total max backoff for 4 tries with the random factor could actually worst case be something
// like 600 + 800 + 800 + 800 - those 1200 ms = 1800 ms, but no way to pass a timeout to expectError
awaitAssert({
// failed response then failed stream
expectFailedResponse()
client.responsesIn.expectError()
}, 1800.millis)
} else {
// directly
// failed response then failed stream
expectFailedResponse()
client.responsesIn.expectError()
}
client.requestsOut.expectCancellation()
Expand Down Expand Up @@ -312,6 +323,8 @@ abstract class Http2PersistentClientSpec(tls: Boolean) extends AkkaSpecWithMater
probe.expectMsg("saw-reconnect")

// max 4 attempts, giving up after that
val error = client.responsesIn.expectNext() // error response
error.status should ===(StatusCodes.ServiceUnavailable)
client.responsesIn.expectError()
client.requestsOut.expectCancellation()
}
Expand Down

0 comments on commit 74a2dbd

Please sign in to comment.