diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 5e6ba8c6231..59919b85816 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -6,12 +6,12 @@ package akka.stream.scaladsl import scala.concurrent.Promise import scala.concurrent.duration._ - import akka.actor.{ Actor, ActorRef, Props } import akka.stream.Attributes.inputBuffer import akka.stream.Materializer import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ +import akka.testkit.EventFilter import akka.testkit.TestProbe object ActorRefBackpressureSinkSpec { @@ -49,7 +49,7 @@ object ActorRefBackpressureSinkSpec { } -class ActorRefBackpressureSinkSpec extends StreamSpec { +class ActorRefBackpressureSinkSpec extends StreamSpec("akka.loglevel=INFO") { import ActorRefBackpressureSinkSpec._ def createActor[T](c: Class[T]) = @@ -247,6 +247,41 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { probe.reply(ackMessage) probe.expectMsg(completeMessage) } + + "stay around until final ack is sent" in { + val probe = TestProbe() + + EventFilter.info(pattern = ".*was not delivered.*", occurrences = 0).intercept { + val sourceProbe = TestSource[String]() + .toMat( + Sink.actorRefWithBackpressure( + probe.ref, + initMessage, + ackMessage, + completeMessage, + (_: Throwable) => failMessage))(Keep.left) + .run() + sourceProbe.ensureSubscription() + + probe.expectMsg(initMessage) + probe.reply(ackMessage) + + sourceProbe.sendNext("one") + probe.expectMsg("one") + probe.reply(ackMessage) + + sourceProbe.sendNext("two") + probe.expectMsg("two") + // buffer empty when complete is seen + sourceProbe.sendComplete() + Thread.sleep(100) + probe.reply(ackMessage) + probe.expectMsg(completeMessage) + + // logging takes a while to arrive + Thread.sleep(100) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index 018ac303c4d..559a3e393ef 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -84,7 +84,7 @@ import akka.stream.stage._ } override def onUpstreamFinish(): Unit = { - if (buffer.isEmpty) finish() + if (buffer.isEmpty && acknowledgementReceived) finish() else completeReceived = true }