diff --git a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala index 29413b942f..e225cff506 100644 --- a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala +++ b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala @@ -55,6 +55,31 @@ abstract class MqttSession { */ def ![A](cp: Command[A]): Unit + /** + * Ask the session to perform a command regardless of the state it is + * in. This is important for sending Publish messages in particular, + * as a connection may not have been established with a session. + * @param cp The command to perform + * @tparam A The type of any carry for the command. + * @return A future indicating when the command has completed. Completion + * is defined as when it has been acknowledged by the recipient + * endpoint. + */ + final def ask[A](cp: Command[A]): Future[A] = + this ? cp + + /** + * Ask the session to perform a command regardless of the state it is + * in. This is important for sending Publish messages in particular, + * as a connection may not have been established with a session. + * @param cp The command to perform + * @tparam A The type of any carry for the command. + * @return A future indicating when the command has completed. Completion + * is defined as when it has been acknowledged by the recipient + * endpoint. + */ + def ?[A](cp: Command[A]): Future[A] + /** * Shutdown the session gracefully */ @@ -171,6 +196,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat: case c: Command[A] => throw new IllegalStateException(c + " is not a client command that can be sent directly") } + override def ?[A](cp: Command[A]): Future[A] = + ??? + override def shutdown(): Unit = { system.stop(clientConnector.toUntyped) system.stop(consumerPacketRouter.toUntyped) @@ -513,6 +541,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat: case c: Command[A] => throw new IllegalStateException(c + " is not a server command that can be sent directly") } + override def ?[A](cp: Command[A]): Future[A] = + ??? + override def shutdown(): Unit = { system.stop(serverConnector.toUntyped) system.stop(consumerPacketRouter.toUntyped) diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala index 90cbbb8957..39dcca78b6 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala @@ -51,43 +51,33 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit val connection = Tcp().outgoingConnection("localhost", 1883) - val mqttFlow: Flow[Command[Promise[Done]], Either[MqttCodec.DecodeError, Event[Promise[Done]]], NotUsed] = + val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] = Mqtt .clientSessionFlow(session, ByteString("1")) .join(connection) //#create-streaming-flow //#run-streaming-flow - val publishDone = Promise[Done] - val (commands: SourceQueueWithComplete[Command[Promise[Done]]], events: Future[Publish]) = + val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) = Source .queue(2, OverflowStrategy.fail) .via(mqttFlow) - .scan((Option.empty[Publish], false)) { - case ((maybePublish, false), Right(Event(_: PubAck, carry))) => - carry.foreach(_.success(Done)) - (maybePublish, true) - case ((None, publishAcked), Right(Event(p: Publish, _))) => - (Some(p), publishAcked) - case ((maybePublish, publishAcked), _) => - (maybePublish, publishAcked) - } .collect { - case (Some(p: Publish), true) => p + case Right(Event(p: Publish, _)) => p } .toMat(Sink.head)(Keep.both) .run() commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession))) commands.offer(Command(Subscribe(topic))) - session ! Command( - Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")), - publishDone - ) + val publishDone = session ? Command( + Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")), + Done + ) //#run-streaming-flow - publishDone.future.futureValue shouldBe Done + publishDone.futureValue shouldBe Done events.futureValue match { case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi") case e => fail("Unexpected event: " + e)