-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ServerBinding
termination does not close HTTP/2 connections
#638
Comments
And http 1.1 behaves ok? |
Yes, everything behaves as expected with HTTP 1.1. |
@bdmendes if you have the code you used to reproduce this can you share it? Helps avoid having someone write the test case from scratch. I suspect that the Unirest client is keeping an active HTTP/2 connection open and that when you use the client again after the ServerBinding close that the connection is still open. If this is the case, we would need to rework ServerBinding close to force any open HTTP/2 connections to be closed. |
is it by designed ? @jrudolph cc. |
In theory, it should work. It was implemented in 4d16224, so it might be a bug or an edge case? A reproducer or at least debug logs would help a lot. |
I have a Pekko actor that does: abstract class HttpActor extends Actor {
var bindingOpt = Option.empty[ServerBinding]
override def preStart() = {
implicit val sys: ActorSystem = context.system
val builder = Http().newServerAt(interface = "0.0.0.0", port = 4321)
builder.bind(...).pipeTo(self)
}
override def postStop(): Unit = {
bindingOpt.foreach(
_.terminate(hardDeadline =
CoordinatedShutdown(context.system).timeout(CoordinatedShutdown.PhaseServiceRequestsDone)
)
}
def receive = { case binding: ServerBinding =>
logger.info(s"Bound to ${binding.localAddress}.$timeoutOverridden")
bindingOpt = Some(binding)
}
} Then I have a spec that does: def get(req: String) = Unirest.get(req).headers(Map()).exec(10.seconds)
val actor = system.actorOf(Props(new TestHttpServiceActor())) // of type HttpActor
get(s"http://localhost:4321/hello").getBody must beEqualTo("world").eventually
system.stop(actor)
get(s"http://localhost:4321/hello").isSuccess must beFalse.eventually Enabling HTTP/2 fails the last assertion. |
I've put a POC at https://github.com/pjfanning/pekko-http-sample/tree/binding-terminate (a branch of my pekko-http-sample project).
When http2 is not enabled, the HTTP request after the binding terminate fails. Issue happens with pekko-http 1.0.1 and 1.1.0. I did an extra test where I did a Thread.sleep of 30s and then tried another HTTP request and that failed with a connection exception. It does look like the server binding terminate does eventually close all the open connections but that the Future returned by binding.terminate completes too quickly. A shorter sleep of 5s was not enough. |
Thanks for putting that together @pjfanning. I only looked quickly into it, it might be that the issue only happens during the uncommon non-tls Http 1.1 => Http 2 upgrade request. Does the issue show for you in a full Http 2 over Https setting, @bdmendes? |
In some way, it's no wonder that this is not implemented as the h2c 101 upgrade flow is the most complicated of all the h2 server setups (and also not supported on the client side). What would need to be done to support this is to change pekko-http/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala Line 97 in 5427d0c
Not sure if that is worth it for a very uncommon setup outside of testing (h2c using upgrade headers). WDYT, @raboof? |
My idea was to try to invert the flow using a joinable ServerTerminator implementation like this: private class JoinableServerTermination extends ServerTerminator {
private val terminationInitiated = Promise[Done]()
private val terminationCompleted = new AtomicReference[Future[Http.HttpTerminated]](Future.successful(Http.HttpServerTerminated))
def onTerminationInitiated(f: () => Future[Http.HttpTerminated]): Unit = {
val promise = Promise[Http.HttpTerminated]()
def chain(f: Future[Http.HttpTerminated]): Unit = {
val p = terminationCompleted.get()
if (!terminationCompleted.compareAndSet(p,p.flatMap(_ => promise.future)(ExecutionContexts.parasitic)))
chain(f)
}
chain(promise.future)
terminationInitiated.future.foreach { _ => promise.completeWith(f())
}(ExecutionContexts.parasitic)
}
override def terminate(deadline: FiniteDuration)(implicit ex: ExecutionContext): Future[Http.HttpTerminated] = {
terminationInitiated.tryComplete(Success(Done))
terminationCompleted.get()
}
} The next question would be how to get it into val http1: HttpImplementation =
Flow.lazyFlow { () =>
val termination = new JoinableServerTermination
Flow[HttpRequest]
.mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, settings, log, termination))
.mapMaterializedValue(_ => termination)
}
.joinMat(GracefulTerminatorStage(system, settings).atop(http.serverLayer(settings, log = log)))( /* join http1 termination here */) In |
Sorry for the delay. Considering there is no explicit |
The question was also about whether it happens when TLS is enabled (as it is common for http2). |
Calling the
ServerBinding
terminate()
method appears to not close open HTTP/2 connections reused by a client for different requests, even after thehardDeadline
timeout is exceeded. In fact, new requests are still handled successfully even after the termination future completes.Steps to reproduce (in 1.1.0):
enable-http2=on
in the Typesafe Config.Http().newServerAt(...).bind(...)
.The text was updated successfully, but these errors were encountered: