From 8a36d3d48f5b0d3a34ce9d5008553e80320fac41 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Thu, 24 Oct 2024 13:26:19 +0200 Subject: [PATCH] Fix google cloud keep alive settings --- .../connectors/google/http/GoogleHttp.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala index 0bafea003..f9d17a2ef 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala @@ -18,8 +18,9 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.http.scaladsl.Http.HostConnectionPool -import pekko.http.scaladsl.model.headers.Authorization +import pekko.http.scaladsl.model.headers.{ Authorization, Connection } import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse } +import pekko.http.scaladsl.settings.ConnectionPoolSettings import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal } import pekko.http.scaladsl.{ Http, HttpExt } import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings } @@ -27,6 +28,7 @@ import pekko.stream.connectors.google.util.Retry import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow } import scala.concurrent.{ ExecutionContextExecutor, Future } +import scala.concurrent.duration.Duration import scala.util.{ Failure, Success, Try } @InternalApi @@ -48,14 +50,21 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A private implicit def system: ExtendedActorSystem = http.system private implicit def ec: ExecutionContextExecutor = system.dispatcher private implicit def scheduler: Scheduler = system.scheduler + private def defaultConnectionPoolSettingsWithInfKeepAlive = + ConnectionPoolSettings(system).withKeepAliveTimeout(Duration.Inf) + + private def setKeepAlive(request: HttpRequest): HttpRequest = + request.removeHeader("connection").addHeader(Connection("Keep-Alive")) /** * Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]]. */ def singleRawRequest(request: HttpRequest)(implicit settings: RequestSettings): Future[HttpResponse] = { - val requestWithStandardParams = addStandardQuery(request) - settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy => - http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings) + val requestWithStandardParams = addStandardQuery(setKeepAlive(request)) + settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext, + defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy => + http.singleRequest(requestWithStandardParams, proxy.connectionContext, + proxy.poolSettings.withKeepAliveTimeout(Duration.Inf)) } } @@ -121,7 +130,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A else FlowWithContext[HttpRequest, Ctx] - val requestFlow = settings.requestSettings.forwardProxy match { + val requestFlow = (settings.requestSettings.forwardProxy match { case None if !https => http.cachedHostConnectionPool[Ctx](host, p) case Some(proxy) if !https => @@ -131,6 +140,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A case Some(proxy) if https => http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings) case _ => throw new RuntimeException(s"illegal proxy settings with https=$https") + }).contramap[(HttpRequest, Ctx)] { case (request, context) => + (setKeepAlive(request), context) } val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {