diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala index b99f1412512..cd8ae8e5ec1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala @@ -20,6 +20,7 @@ import akka.stream.Attributes.Attribute import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.TestSource // Debug loglevel to diagnose https://github.com/akka/akka-core/issues/30469 class FlowFlatMapPrefixSpec extends StreamSpec("akka.loglevel = debug") { @@ -654,13 +655,14 @@ class FlowFlatMapPrefixSpec extends StreamSpec("akka.loglevel = debug") { } "complete when downstream cancels before pulling and upstream does not produce" in { - val fSeq = Source(List.empty[Int]) + val (probe, fSeq) = TestSource[Int]() .flatMapPrefixMat(1) { prefix => Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .to(Sink.cancelled) + }(Keep.both) + .toMat(Sink.cancelled)(Keep.left) .withAttributes(attributes) .run() + probe.sendComplete() if (att.propagateToNestedMaterialization) { fSeq.futureValue should equal(Nil) @@ -670,7 +672,7 @@ class FlowFlatMapPrefixSpec extends StreamSpec("akka.loglevel = debug") { } "complete when downstream cancels before pulling and upstream does not produce, prefix=0" in { - val fSeq = Source(List.empty[Int]) + val fSeq = TestSource[Int]() .flatMapPrefixMat(0) { prefix => Flow[Int].mapMaterializedValue(_ => prefix) }(Keep.right) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 8de6fbced2e..ef59106761e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -84,6 +84,35 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } } + "Source from iterable" must { + "produce optimized source for no elements" in { + val source = Source(Nil) + source should ===(Source.empty) + val result = source.runWith(Sink.seq) + result.futureValue should ===(Seq.empty) + } + + "produce optimized source for one element Vector" in { + val source = Source(Vector(1)) + val result = source.runWith(Sink.seq) + result.futureValue should ===(immutable.Seq(1)) + source.getAttributes.nameLifted should ===(Some("singleSource")) + } + + "produce optimized source for one element List" in { + val source = Source(List(1)) + val result = source.runWith(Sink.seq) + result.futureValue should ===(immutable.Seq(1)) + source.getAttributes.nameLifted should ===(Some("singleSource")) + } + + "produce all elements fed to it" in { + val source = Source(List(1, 2, 3)) + val result = source.runWith(Sink.seq) + result.futureValue should ===(immutable.Seq(1, 2, 3)) + } + } + "Composite Source" must { "merge from many inputs" in { val probes = immutable.Seq.fill(5)(TestPublisher.manualProbe[Int]()) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0d6a79c6be0..016c74a87ec 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -353,7 +353,17 @@ object Source { * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = - fromGraph(new IterableSource[T](iterable)) + iterable match { + case Nil => empty + case head :: Nil => single(head) + case s: IterableOnce[T] => + s.knownSize match { + case 0 => empty + case 1 => single(s.head) + case _ => fromGraph(new IterableSource[T](iterable)) + } + case _ => fromGraph(new IterableSource[T](iterable)) + } /** * Starts a new `Source` from the given `Future`. The stream will consist of