Skip to content

Commit

Permalink
fix some compiler warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
NavidJalali committed Jan 15, 2025
1 parent 079059b commit 8302087
Show file tree
Hide file tree
Showing 18 changed files with 47 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private final class HttpsProxyGraphStage(
}
}

override def onDownstreamFinish(): Unit = cancel(sslIn)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(sslIn)

})

Expand All @@ -199,7 +199,7 @@ private final class HttpsProxyGraphStage(
pull(bytesIn)
}

override def onDownstreamFinish(): Unit = cancel(bytesIn)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(bytesIn)

})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private[http] object OutgoingConnectionBlueprint {
if (!entitySubstreamStarted) pull(responseOutputIn)
}

override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
// if downstream cancels while streaming entity,
// make sure we also cancel the entity source, but
// after being done with streaming the entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ private[client] object NewHostConnectionPool {

def onPull(): Unit = () // emitRequests makes sure not to push too early

override def onDownstreamFinish(): Unit =
override def onDownstreamFinish(cause: Throwable): Unit =
withSlot { slot =>
slot.debug("Connection cancelled")
// Let's use StreamTcpException for now.
Expand All @@ -574,7 +574,7 @@ private[client] object NewHostConnectionPool {
requestOut.setHandler(connection)
}

override def onDownstreamFinish(): Unit = connection.onDownstreamFinish()
override def onDownstreamFinish(cause: Throwable): Unit = connection.onDownstreamFinish(cause)
})
}
def openConnection(slot: Slot): SlotConnection = {
Expand Down Expand Up @@ -622,9 +622,9 @@ private[client] object NewHostConnectionPool {
log.debug("Pool upstream failed with {}", ex)
super.onUpstreamFailure(ex)
}
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
log.debug("Pool downstream cancelled")
super.onDownstreamFinish()
super.onDownstreamFinish(cause)
}
override def postStop(): Unit = {
slots.foreach(_.shutdown())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
/** Network pulls in new frames */
def onPull(): Unit = updateState(_.onPull())

override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
frameOutFinished()
super.onDownstreamFinish()
super.onDownstreamFinish(cause)
}

private var _state: MultiplexerState = Idle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
outlet.setHandler(this)

def onPull(): Unit = incomingStreamPulled(streamId)
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
debug(s"Incoming side of stream [$streamId]: cancelling because downstream finished")
multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.CANCEL))
// FIXME: go through state machine and don't manipulate vars directly here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[http] object ProtocolSwitch {
new OutHandler {
override def onPull(): Unit = pull(in)

override def onDownstreamFinish(): Unit = cancel(in)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(in)
}

val firstHandler =
Expand Down Expand Up @@ -151,9 +151,9 @@ private[http] object ProtocolSwitch {
val outHandler = new OutHandler {
override def onPull(): Unit = in.pull()

override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
in.cancel()
super.onDownstreamFinish()
super.onDownstreamFinish(cause)
}
}
in.setHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private[http2] object PersistentConnection {
requestOut.setHandler(new OutHandler {
override def onPull(): Unit =
requestOutPulled = true
override def onDownstreamFinish(): Unit = ()
override def onDownstreamFinish(cause: Throwable): Unit = ()
})
responseIn.setHandler(new InHandler {
override def onPush(): Unit = throw new IllegalStateException("no response push expected while connecting")
Expand Down Expand Up @@ -196,7 +196,7 @@ private[http2] object PersistentConnection {
if (!isAvailable(requestIn)) pull(requestIn)
else dispatchRequest(grab(requestIn))

override def onDownstreamFinish(): Unit = onDisconnected()
override def onDownstreamFinish(cause: Throwable): Unit = onDisconnected()
})
responseIn.setHandler(new InHandler {
override def onPush(): Unit = {
Expand Down Expand Up @@ -253,10 +253,10 @@ private[http2] object PersistentConnection {
responseIn.cancel()
failStage(ex)
}
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
requestOut.complete()
responseIn.cancel()
super.onDownstreamFinish()
super.onDownstreamFinish(cause)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private[http] class HttpRequestRendererFactory(
val stream = ctx.sendEntityTrigger match {
case None => headerPart ++ body
case Some(future) =>
val barrier = Source.fromFuture(future).drop(1).asInstanceOf[Source[ByteString, Any]]
val barrier = Source.future(future).drop(1).asInstanceOf[Source[ByteString, Any]]
(headerPart ++ barrier ++ body).recoverWithRetries(-1,
{ case HttpResponseParser.OneHundredContinueError => Source.empty })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[http] class HttpResponseRendererFactory(
override def onPull(): Unit =
if (!headersSent) sendHeaders()
else sinkIn.pull()
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
completeStage()
stopTransfer()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private[http] object HttpServerBluePrint {
}

// optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
if (entitySource ne null) {
// application layer has cancelled or only partially consumed response entity:
// connection will be closed
Expand Down Expand Up @@ -235,7 +235,7 @@ private[http] object HttpServerBluePrint {
// so can pull downstream then
downstreamPullWaiting = true
}
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
// downstream signalled not wanting any more requests
// we should keep processing the entity stream and then
// when it completes complete the stage
Expand Down Expand Up @@ -320,14 +320,14 @@ private[http] object HttpServerBluePrint {
openTimeouts = openTimeouts.enqueue(access)
push(requestOut, request.addHeader(`Timeout-Access`(access)).withEntity(entity))
}
override def onUpstreamFinish() = complete(requestOut)
override def onUpstreamFailure(ex: Throwable) = fail(requestOut, ex)
override def onUpstreamFinish(): Unit = complete(requestOut)
override def onUpstreamFailure(ex: Throwable): Unit = fail(requestOut, ex)
})
// TODO: provide and use default impl for simply connecting an input and an output port as we do here
setHandler(requestOut,
new OutHandler {
def onPull(): Unit = pull(requestIn)
override def onDownstreamFinish() = cancel(requestIn)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(requestIn)
})
setHandler(responseIn,
new InHandler {
Expand All @@ -336,13 +336,14 @@ private[http] object HttpServerBluePrint {
openTimeouts = openTimeouts.tail
push(responseOut, grab(responseIn))
}
override def onUpstreamFinish() = complete(responseOut)
override def onUpstreamFailure(ex: Throwable) = fail(responseOut, ex)
override def onUpstreamFinish(): Unit = complete(responseOut)
override def onUpstreamFailure(ex: Throwable): Unit = fail(responseOut, ex)
})
setHandler(responseOut,
new OutHandler {
def onPull(): Unit = pull(responseIn)
override def onDownstreamFinish() = cancel(responseIn)
def onPull():
Unit = pull(responseIn)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(responseIn)
})
}
}
Expand Down Expand Up @@ -481,7 +482,7 @@ private[http] object HttpServerBluePrint {
def onPull(): Unit =
if (oneHundredContinueResponsePending) pullSuppressed = true
else if (!hasBeenPulled(requestParsingIn)) pull(requestParsingIn)
override def onDownstreamFinish(): Unit =
override def onDownstreamFinish(cause: Throwable): Unit =
if (openRequests.isEmpty) completeStage()
else failStage(
new IllegalStateException("User handler flow was cancelled with ongoing request") with NoStackTrace)
Expand Down Expand Up @@ -705,7 +706,7 @@ private[http] object HttpServerBluePrint {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = pull(fromHttp)
override def onDownstreamFinish(): Unit = completeStage()
override def onDownstreamFinish(cause: Throwable): Unit = completeStage()
})

setHandler(fromNet,
Expand All @@ -717,7 +718,7 @@ private[http] object HttpServerBluePrint {
setHandler(toHttp,
new OutHandler {
override def onPull(): Unit = pull(fromNet)
override def onDownstreamFinish(): Unit = cancel(fromNet)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(fromNet)
})

private var activeTimers = 0
Expand Down Expand Up @@ -753,7 +754,7 @@ private[http] object HttpServerBluePrint {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
completeStage()
sinkIn.cancel()
}
Expand All @@ -771,7 +772,7 @@ private[http] object HttpServerBluePrint {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
completeStage()
sinkIn.cancel()
sourceOut.complete()
Expand Down Expand Up @@ -801,10 +802,10 @@ private[http] object HttpServerBluePrint {

sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = if (!hasBeenPulled(fromNet)) pull(fromNet)
override def onDownstreamFinish(): Unit = cancel(fromNet)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(fromNet)
})
}
override def onDownstreamFinish(): Unit = cancel(fromNet)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(fromNet)
})

// disable the old handlers, at this point we might still get something due to cancellation delay which we need to ignore
Expand All @@ -814,7 +815,7 @@ private[http] object HttpServerBluePrint {
override def onPull(): Unit = ()
override def onUpstreamFinish(): Unit = ()
override def onUpstreamFailure(ex: Throwable): Unit = ()
override def onDownstreamFinish(): Unit = ()
override def onDownstreamFinish(cause: Throwable): Unit = ()
})

newFlow.runWith(sourceOut.source, sinkIn.sink)(subFusingMaterializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[http] object One2OneBidiFlow {
override def onPull(): Unit =
if (insideWrappedFlow < maxPending || maxPending == -1) pull(in)
else pullSuppressed = true
override def onDownstreamFinish(): Unit = cancel(in)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(in)
})

setHandler(fromWrapped,
Expand All @@ -117,7 +117,7 @@ private[http] object One2OneBidiFlow {
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(fromWrapped)
override def onDownstreamFinish(): Unit = cancel(fromWrapped)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(fromWrapped)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private[http] object StreamUtils {
object OneTimeValve {
def apply(): OneTimeValve = new OneTimeValve {
val promise = Promise[Unit]()
val _source = Source.fromFuture(promise.future).drop(1) // we are only interested in the completion event
val _source = Source.future(promise.future).drop(1) // we are only interested in the completion event

def source[T]: Source[T, NotUsed] = _source.asInstanceOf[Source[T, NotUsed]] // safe, because source won't generate any elements
def open(): Unit = promise.success(())
Expand All @@ -226,7 +226,7 @@ private[http] object StreamUtils {

var timeout: OptionVal[Cancellable] = OptionVal.None

override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
cancelAfter match {
case finite: FiniteDuration =>
log.debug(s"Delaying cancellation for $finite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
TextMessage.create("abc"),
TextMessage.create("def"),
TextMessage.create("ghi")))
.concat(Source.fromFuture(delayedCompletion).drop(1));
.concat(Source.future(delayedCompletion).drop(1));

Sink<Message, CompletionStage<List<String>>> echoSink =
Flow.of(Message.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ class HostConnectionPoolSpec extends PekkoSpecWithMaterializer(
super.onUpstreamFailure(ex)
}

override def onDownstreamFinish(): Unit = failStage(new RuntimeException("was cancelled"))
override def onDownstreamFinish(cause: Throwable): Unit = failStage(new RuntimeException("was cancelled"))
}
setHandlers(reqIn, reqOut, new MonitorMessage(reqIn, reqOut))
setHandlers(resIn, resOut, new MonitorMessage(resIn, resOut))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class NewConnectionPoolSpec extends PekkoSpecWithMaterializer("""

val crashingEntity =
Source.fromIterator(() => Iterator.fill(10)(ByteString("abc")))
.concat(Source.fromFuture(errorOnConnection1.future))
.concat(Source.future(errorOnConnection1.future))
.log("response-entity-stream")
.addAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ abstract class RequestParserSpec(mode: String, newLine: String) extends AnyFreeS
}
.concatSubstreams
.flatMapConcat { x =>
Source.fromFuture {
Source.future {
x match {
case Right(request) => compactEntity(request.entity).fast.map(x => Right(request.withEntity(x)))
case Left(error) => FastFuture.successful(Left(error))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class HttpEntitySpec extends PekkoSpecWithMaterializer {
"Infinite data stream" in {
val neverCompleted = Promise[ByteString]()
intercept[TimeoutException] {
Await.result(Default(tpe, 42, Source.fromFuture(neverCompleted.future)).toStrict(100.millis), awaitAtMost)
Await.result(Default(tpe, 42, Source.future(neverCompleted.future)).toStrict(100.millis), awaitAtMost)
}.getMessage should be(
"HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class EntityStreamingSpec extends RoutingSpec with ScalaFutures {

// flatten the Future[Source[]] into a Source[]:
val source: Source[Tweet, Future[NotUsed]] =
Source.fromFutureSource(unmarshalled)
Source.futureSource(unmarshalled)

// #json-streaming-client-example
// tests ------------------------------------------------------------
Expand Down

0 comments on commit 8302087

Please sign in to comment.