From 9bb4d90df2de6f0c818221381d515b9d2953918e Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Sat, 23 Nov 2024 19:15:27 +0100 Subject: [PATCH] Fix/resolve flaky error in tests (#872) Fixes some flaky test acting on 3 things: - drop from custom low level client the callback mechanism and the queue, but resort just in the queued messages an wait blocking on it. - updates the Publish collect based on Paho client to release the latch only after having copied the data, to avoid nulls due to read ordering From ```java latch.countDown(); receivedTopic = topic; receivedMessage = message; ``` to ```java receivedTopic = topic; receivedMessage = message; latch.countDown(); ``` - in Hive client usages, register the listener to subscribe before the verification and not during the verification. This makes the code less linear, but it's the intended use of the API. --- ...ServerLowlevelMessagesIntegrationTest.java | 4 +- .../mqtt5/AbstractServerIntegrationTest.java | 4 +- ...ServerIntegrationWithoutClientFixture.java | 46 +++-- .../integration/mqtt5/ConnectAckTest.java | 2 +- .../integration/mqtt5/ConnectTest.java | 76 +++---- .../integration/mqtt5/ContentTypeTest.java | 23 ++- .../integration/mqtt5/FlowControlTest.java | 10 +- .../mqtt5/MessageExpirationTest.java | 20 +- .../mqtt5/PayloadFormatIndicatorTest.java | 29 +-- .../mqtt5/RequestResponseTest.java | 67 +++--- .../mqtt5/SharedSubscriptionTest.java | 192 +++++++++--------- .../mqtt5/SubscriptionOptionsTest.java | 2 +- .../mqtt5/SubscriptionWithIdentifierTest.java | 2 +- .../java/io/moquette/testclient/Client.java | 156 ++++++-------- 14 files changed, 312 insertions(+), 321 deletions(-) diff --git a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java index 08b07d7e5..dd038c998 100644 --- a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java @@ -145,9 +145,7 @@ public void testWillMessageIsFiredOnClientKeepAliveExpiry() throws Exception { @Test public void testRejectConnectWithEmptyClientID() throws InterruptedException { LOG.info("*** testRejectConnectWithEmptyClientID ***"); - m_client.clientId("").connect(); - - this.receivedMsg = this.m_client.lastReceivedMessage(); + this.receivedMsg = m_client.clientId("").connect(); assertTrue(receivedMsg instanceof MqttConnAckMessage); MqttConnAckMessage connAck = (MqttConnAckMessage) receivedMsg; diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index 8dd989d01..13da64aa0 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -46,12 +46,12 @@ public void tearDown() throws Exception { super.tearDown(); } - void connectLowLevel() { + void connectLowLevel() throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(); assertConnectionAccepted(connAck, "Connection must be accepted"); } - void connectLowLevel(int keepAliveSecs) { + void connectLowLevel(int keepAliveSecs) throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE); assertConnectionAccepted(connAck, "Connection must be accepted"); } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java index e8d23526a..0d48821fa 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java @@ -148,35 +148,39 @@ protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer action, MqttQos expectedQos, + protected static void verifyNoPublish(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer action, Duration timeout, String message) throws InterruptedException { + action.accept(null); + Optional publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS); + + // verify no published will in 10 seconds + assertFalse(publishedMessage.isPresent(), message); + } + + protected static void verifyPublishedMessage(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer action, MqttQos expectedQos, String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) { - action.accept(null); - Optional publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS); - if (!publishMessage.isPresent()) { - fail("Expected to receive a publish message"); - return; - } - Mqtt5Publish msgPub = publishMessage.get(); - final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals(expectedPayload, payload, errorMessage); - assertEquals(expectedQos, msgPub.getQos()); + action.accept(null); + Optional publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; } + Mqtt5Publish msgPub = publishMessage.get(); + final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals(expectedPayload, payload, errorMessage); + assertEquals(expectedQos, msgPub.getQos()); } static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) { assertEquals(mqttMessageType, received.fixedHeader().messageType()); } - static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer assertion) throws InterruptedException { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - Optional publishMessage = publishes.receive(1, TimeUnit.SECONDS); - if (!publishMessage.isPresent()) { - fail("Expected to receive a publish message"); - return; - } - Mqtt5Publish msgPub = publishMessage.get(); - assertion.accept(msgPub); + static void verifyPublishMessage(Mqtt5BlockingClient.Mqtt5Publishes publishListener, Consumer assertion) throws InterruptedException { + Optional publishMessage = publishListener.receive(1, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; } + Mqtt5Publish msgPub = publishMessage.get(); + assertion.accept(msgPub); } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java index 12b3479c1..961948dfe 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java @@ -57,7 +57,7 @@ public void testAckResponseProperties() { } @Test - public void testAssignedClientIdentifier() { + public void testAssignedClientIdentifier() throws InterruptedException { Client unnamedClient = new Client("localhost").clientId(""); connAck = unnamedClient.connectV5(); assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected"); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index d65d2cc39..2b7633d19 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -71,7 +71,7 @@ public void simpleConnect() { } @Test - public void sendConnectOnDisconnectedConnection() { + public void sendConnectOnDisconnectedConnection() throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(); TestUtils.assertConnectionAccepted(connAck, "Connection must be accepted"); lowLevelClient.disconnect(); @@ -85,7 +85,7 @@ public void sendConnectOnDisconnectedConnection() { } @Test - public void receiveInflightPublishesAfterAReconnect() { + public void receiveInflightPublishesAfterAReconnect() throws InterruptedException { final Mqtt5BlockingClient publisher = MqttClient.builder() .useMqttVersion5() .identifier("publisher") @@ -180,30 +180,32 @@ public void avoidToFirePreviouslyScheduledWillWhenSameClientIDReconnects() throw final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId); final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); + try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // client trigger a will message, disconnecting with bad reason code - final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder() - .reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET) - .build(); - clientWithWill.disconnect(malformedPacketReason); + // client trigger a will message, disconnecting with bad reason code + final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder() + .reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET) + .build(); + clientWithWill.disconnect(malformedPacketReason); - // wait no will is published - verifyNoTestamentIsPublished(testamentSubscriber, unused -> { - // reconnect another client with same clientId - final Mqtt5BlockingClient client = MqttClient.builder() - .useMqttVersion5() - .identifier(clientId) - .serverHost("localhost") - .serverPort(1883) - .buildBlocking(); - Mqtt5ConnAck connectAck = client.connect(); - assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected"); - - }, Duration.ofSeconds(10)); + // wait no will is published + verifyNoTestamentIsPublished(testamentListener, unused -> { + // reconnect another client with same clientId + final Mqtt5BlockingClient client = MqttClient.builder() + .useMqttVersion5() + .identifier(clientId) + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + Mqtt5ConnAck connectAck = client.connect(); + assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected"); + + }, Duration.ofSeconds(10)); + } } - private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient testamentSubscriber, Consumer action, Duration timeout) throws InterruptedException { - verifyNoPublish(testamentSubscriber, action, timeout, "No will message should be published"); + private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient.Mqtt5Publishes testamentListener, Consumer action, Duration timeout) throws InterruptedException { + verifyNoPublish(testamentListener, action, timeout, "No will message should be published"); } @Test @@ -230,12 +232,14 @@ public void noWillMessageIsFiredOnNormalDisconnection() throws InterruptedExcept final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60); final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); + try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // wait no will is published - verifyNoTestamentIsPublished(testamentSubscriber, unused -> { - // normal session disconnection - clientWithWill.disconnect(Mqtt5Disconnect.builder().build()); - }, Duration.ofSeconds(10)); + // wait no will is published + verifyNoTestamentIsPublished(testamentListener, unused -> { + // normal session disconnection + clientWithWill.disconnect(Mqtt5Disconnect.builder().build()); + }, Duration.ofSeconds(10)); + } } @Test @@ -245,14 +249,16 @@ public void givenClientWithWillThatCleanlyDisconnectsWithWillShouldTriggerTheTes final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60); final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); - - // wait no will is published - verifyNoTestamentIsPublished(testamentSubscriber, unused -> { - // normal session disconnection with will - clientWithWill.disconnect(Mqtt5Disconnect.builder() - .reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE) - .build()); - }, Duration.ofSeconds(10)); + try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) { + + // wait no will is published + verifyNoTestamentIsPublished(testamentListener, unused -> { + // normal session disconnection with will + clientWithWill.disconnect(Mqtt5Disconnect.builder() + .reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE) + .build()); + }, Duration.ofSeconds(10)); + } } @Test diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java index 267bd7e96..c08da23c4 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; @@ -65,17 +66,19 @@ public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent() .send(); Mqtt5BlockingClient publisher = createPublisherClient(); - publisher.publishWith() - .topic("temperature/living") - .payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8)) - .contentType("application/json") - .qos(MqttQos.AT_MOST_ONCE) - .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + publisher.publishWith() + .topic("temperature/living") + .payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8)) + .contentType("application/json") + .qos(MqttQos.AT_MOST_ONCE) + .send(); - verifyPublishMessage(subscriber, msgPub -> { - assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present"); - assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched"); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present"); + assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched"); + }); + } } @Test diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java index 7df84f543..eaafb8525 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.TimeUnit; import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -73,7 +72,7 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect // sixth should exceed quota and the client should get a disconnect sendQoS2Publish(); - MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage(); + MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500)); assertEquals(MqttMessageType.DISCONNECT, receivedMsg.fixedHeader().messageType(), "On sixth in flight message the send quota is exhausted and response should be DISCONNECT"); MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMsg.variableHeader(); @@ -83,8 +82,8 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect assertTrue(lowLevelClient.isConnectionLost(), "Connection MUST be closed by the server"); } - private void verifyReceived(MqttMessageType expectedMessageType) { - MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage(); + private void verifyReceived(MqttMessageType expectedMessageType) throws InterruptedException { + MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500)); assertEquals(expectedMessageType, receivedMsg.fixedHeader().messageType()); } @@ -94,7 +93,7 @@ private void sendQoS2Publish() { MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, MqttProperties.NO_PROPERTIES); ByteBuf payload = Unpooled.wrappedBuffer("18°C".getBytes(StandardCharsets.UTF_8)); MqttPublishMessage publishQoS2 = new MqttPublishMessage(fixedHeader, variableHeader, payload); - lowLevelClient.publish(publishQoS2, 500, TimeUnit.MILLISECONDS); + lowLevelClient.publish(publishQoS2); } @Override @@ -159,6 +158,7 @@ public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectl // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", MqttQoS.AT_LEAST_ONCE, 123); + verifyOfType(received, MqttMessageType.SUBACK); //lowlevel client doesn't ACK any pub, so the in flight window fills up diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index c65bdde50..108e6fcf9 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; @@ -68,13 +69,16 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIs // subscribe to same topic and verify no message Mqtt5BlockingClient subscriber = createSubscriberClient(); - subscriber.subscribeWith() - .topicFilter("temperature/living") - .qos(MqttQos.AT_MOST_ONCE) - .send(); - - verifyNoPublish(subscriber, v -> {}, Duration.ofSeconds(2), - "Subscriber must not receive any retained message"); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.AT_MOST_ONCE) + .send(); + + verifyNoPublish(publishes, v -> { + }, Duration.ofSeconds(2), + "Subscriber must not receive any retained message"); + } } // TODO verify the elapsed @@ -153,7 +157,7 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", - MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS); + MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(500)); verifyOfType(received, MqttMessageType.SUBACK); //lowlevel client doesn't ACK any pub, so the in flight window fills up diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java index ff0b7b405..0169249b5 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; @@ -45,6 +46,7 @@ import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; @@ -69,18 +71,19 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen .topicFilter("temperature/living") .qos(MqttQos.AT_MOST_ONCE) .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5BlockingClient publisher = createPublisherClient(); + publisher.publishWith() + .topic("temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_MOST_ONCE) + .send(); - Mqtt5BlockingClient publisher = createPublisherClient(); - publisher.publishWith() - .topic("temperature/living") - .payload("18".getBytes(StandardCharsets.UTF_8)) - .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) - .qos(MqttQos.AT_MOST_ONCE) - .send(); - - verifyPublishMessage(subscriber, msgPub -> { - assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); + }); + } } @Test @@ -168,10 +171,10 @@ public void givenNotValidUTF8StringInPublishQoS0WhenPayloadFormatIndicatorIsSetT MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, props); MqttPublishMessage publishQoS0 = new MqttPublishMessage(fixedHeader, variableHeader, Unpooled.wrappedBuffer(INVALID_UTF_8_BYTES)); // in a reasonable amount of time (say 500 ms) it should receive a DISCONNECT - lowLevelClient.publish(publishQoS0, 500, TimeUnit.MILLISECONDS); + lowLevelClient.publish(publishQoS0); // Verify a DISCONNECT is received with PAYLOAD_FORMAT_INVALID reason code and connection is closed - final MqttMessage receivedMessage = lowLevelClient.lastReceivedMessage(); + final MqttMessage receivedMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(500)); assertEquals(MqttMessageType.DISCONNECT, receivedMessage.fixedHeader().messageType()); MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMessage.variableHeader(); assertEquals(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID.byteValue(), disconnectHeader.reasonCode(), diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 161a1d3a5..f56a7296d 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; @@ -52,13 +53,15 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply( final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); - responderRepliesToRequesterPublish(responder, requester, responseTopic); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) { + responderRepliesToRequesterPublish(responder, requester, responseTopic); - verifyPublishMessage(requester, msgPub -> { - assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); - String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("OK", payload); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + }); + } } private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient responder, Mqtt5BlockingClient requester, String responseTopic) { @@ -126,24 +129,26 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW }); waitForSubAck(subackFuture); - Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() - .topic("requester/door/open") - .responseTopic(responseTopic) - .correlationData("req-open-door".getBytes(StandardCharsets.UTF_8)) - .payload("Please open the door".getBytes(StandardCharsets.UTF_8)) - .qos(MqttQos.AT_LEAST_ONCE) - .send(); - assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(), - "Open door request cannot be published "); - - verifyPublishMessage(requester, msgPub -> { - assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); - String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("OK", payload); - assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish"); - final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get()); - assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8)); - }); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() + .topic("requester/door/open") + .responseTopic(responseTopic) + .correlationData("req-open-door".getBytes(StandardCharsets.UTF_8)) + .payload("Please open the door".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(), + "Open door request cannot be published "); + + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish"); + final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get()); + assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8)); + }); + } } private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) { @@ -174,12 +179,14 @@ public void givenRequestResponseProtocolAndClientIsConnectedWhenRequestIsIssueTh final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); - responderRepliesToRequesterPublish(responder, requester, responseTopic); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) { + responderRepliesToRequesterPublish(responder, requester, responseTopic); - verifyPublishMessage(requester, msgPub -> { - assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); - String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("OK", payload); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + }); + } } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java index e0997816c..47016d496 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java @@ -1,5 +1,6 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; @@ -19,8 +20,6 @@ import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -33,19 +32,16 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class SharedSubscriptionTest extends AbstractSubscriptionIntegrationTest { - private static final Logger LOG = LoggerFactory.getLogger(SharedSubscriptionTest.class); - @Override public String clientName() { return "subscriber"; } @Test - public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() { + public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisconnected() throws InterruptedException { connectLowLevel(); MqttMessage received = lowLevelClient.subscribeWithError("$share/+/measures/temp", MqttQoS.AT_LEAST_ONCE); @@ -57,7 +53,7 @@ public void givenAClientSendingBadlyFormattedSharedSubscriptionNameThenItIsDisco } @Test - public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() { + public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptionACK() throws InterruptedException { connectLowLevel(); MqttMessage received = lowLevelClient.subscribeWithError("$share/metrics/measures/temp", MqttQoS.AT_LEAST_ONCE); @@ -70,7 +66,7 @@ public void givenClientSubscribingToSharedTopicThenReceiveTheExpectedSubscriptio } @Test - public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException { + public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWithNegativeResponse() throws IOException, InterruptedException { // stop already started broker instance stopServer(); @@ -93,7 +89,7 @@ public void givenATopicNotReadableWhenAClientSubscribeSharedThenReceiveSubackWit @Test - public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException { + public void givenClientSubscribingToSharedAndNonSharedWhenTheSharedIsNotReadableReceivesPositiveAckOnlyForNonShared() throws IOException, InterruptedException { // stop already started broker instance stopServer(); @@ -135,13 +131,14 @@ public void givenASharedSubscriptionClientReceivesANotification() throws Excepti subscriberClient.subscribeWith() .topicFilter("$share/collectors/metric/temperature/#") .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriberClient.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5BlockingClient publisherClient = createPublisherClient(); - Mqtt5BlockingClient publisherClient = createPublisherClient(); - - verifyPublishedMessage(subscriberClient, unused -> publisherClient.publishWith() - .topic("metric/temperature/living") - .payload("18".getBytes(StandardCharsets.UTF_8)) - .send(), MqttQos.AT_MOST_ONCE, "18", "Shared message must be received", 10); + verifyPublishedMessage(publishes, unused -> publisherClient.publishWith() + .topic("metric/temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .send(), MqttQos.AT_MOST_ONCE, "18", "Shared message must be received", 10); + } } @Test @@ -216,21 +213,22 @@ public void whenAClientSubscribeToASharedTopicThenDoesntReceiveAnyRetainedMessag public void givenSharedSubscriptionWithCertainQoSWhenSameClientWithSameShareSubscribeToSameTopicFilterThenQoSUpdates() throws Exception { final Mqtt5BlockingClient subscriberClient = createSubscriberClient(); subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_MOST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriberClient.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5BlockingClient publisherClient = createPublisherClient(); - Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, + unused -> publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE), + MqttQos.AT_MOST_ONCE, "18", "QoS0 publish message is expected by the subscriber when subscribed with AT_MOST_ONCE", 1); - verifyPublishedMessage(subscriberClient, - unused -> publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE), - MqttQos.AT_MOST_ONCE, "18", "QoS0 publish message is expected by the subscriber when subscribed with AT_MOST_ONCE", 1); + // update QoS for shared subscription + subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_LEAST_ONCE); - // update QoS for shared subscription - subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_LEAST_ONCE); - - // This time the publish reaches the subscription - verifyPublishedMessage(subscriberClient, v -> { - // publish the message again and verify the captured message - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 30); + // This time the publish reaches the subscription + verifyPublishedMessage(publishes, v -> { + // publish the message again and verify the captured message + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 30); + } } private static void publish(Mqtt5BlockingClient publisherClient, String topicName, MqttQos mqttQos) { @@ -259,25 +257,27 @@ public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeT // subscribe second client to shared subscription final Mqtt5BlockingClient subscriber2 = createHiveBlockingClient("subscriber2"); subscribe(subscriber2, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); - - // unsubscribe successfully the first subscriber - Mqtt5UnsubAck result = subscriber1.unsubscribeWith() - .topicFilter(fullSharedSubscriptionTopicFilter) - .send(); - assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), - "Unsubscribe of shared subscription must be successful"); - - - // verify it's received from the survivor subscriber2 - Mqtt5BlockingClient publisherClient = createPublisherClient(); - // try 4 times we should hit all the 4 times the subscriber2 - // if the other shared subscription remains active we have 50% of possibility - // to hit the not removed subscriber, so 4 iterations should be enough. - for (int i = 0; i < 4; i++) { - verifyPublishedMessage(subscriber2, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + try (Mqtt5BlockingClient.Mqtt5Publishes subscriber2Listener = subscriber2.publishes(MqttGlobalPublishFilter.ALL)) { + + // unsubscribe successfully the first subscriber + Mqtt5UnsubAck result = subscriber1.unsubscribeWith() + .topicFilter(fullSharedSubscriptionTopicFilter) + .send(); + assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), + "Unsubscribe of shared subscription must be successful"); + + + // verify it's received from the survivor subscriber2 + Mqtt5BlockingClient publisherClient = createPublisherClient(); + // try 4 times we should hit all the 4 times the subscriber2 + // if the other shared subscription remains active we have 50% of possibility + // to hit the not removed subscriber, so 4 iterations should be enough. + for (int i = 0; i < 4; i++) { + verifyPublishedMessage(subscriber2Listener, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + } } } @@ -288,27 +288,29 @@ public void givenASharedSubscriptionWhenLastSubscribedClientUnsubscribeThenTheSh // subscribe client to shared subscription final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber1"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // verify subscribed to the shared receives a message - Mqtt5BlockingClient publisherClient = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); - - // unsubscribe the only shared subscription client - Mqtt5UnsubAck result = subscriber.unsubscribeWith() - .topicFilter(fullSharedSubscriptionTopicFilter) - .send(); - assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), - "Unsubscribe of shared subscription must be successful"); - - // verify no publish is propagated by shared subscription - verifyNoPublish(subscriber, v -> { + // verify subscribed to the shared receives a message + Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { // push a message to the shared subscription publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, Duration.ofSeconds(2), - "Subscriber must not receive any message from the left shared subscription"); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + + // unsubscribe the only shared subscription client + Mqtt5UnsubAck result = subscriber.unsubscribeWith() + .topicFilter(fullSharedSubscriptionTopicFilter) + .send(); + assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), + "Unsubscribe of shared subscription must be successful"); + + // verify no publish is propagated by shared subscription + verifyNoPublish(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, Duration.ofSeconds(2), + "Subscriber must not receive any message from the left shared subscription"); + } } @Test @@ -318,22 +320,24 @@ public void givenASharedSubscriptionWhenLastSubscribedClientSessionTerminatesThe // subscribe client to shared subscription final Mqtt5BlockingClient subscriber = createCleanStartClient("subscriber1"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // verify subscribed to the shared receives a message - Mqtt5BlockingClient publisherClient = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); - - // disconnect the subscriber, so becuase it's clean, wipe all shared subscriptions - subscriber.disconnect(); - - // verify that a publish on shared topic doesn't have any side effect - verifyNoPublish(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, Duration.ofSeconds(2), "Shared message must be received"); + // verify subscribed to the shared receives a message + Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + + // disconnect the subscriber, so becuase it's clean, wipe all shared subscriptions + subscriber.disconnect(); + + // verify that a publish on shared topic doesn't have any side effect + verifyNoPublish(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, Duration.ofSeconds(2), "Shared message must be received"); + } } @Test @@ -343,25 +347,27 @@ public void givenASharedSubscriptionWhenBrokerRestartsAndClientReconnectsThenSha // subscribe client to shared subscription Mqtt5BlockingClient subscriber = createNonCleanStartClient("subscriber"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // verify subscribed to the shared receives a message - final Mqtt5BlockingClient publisherClient = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); - + // verify subscribed to the shared receives a message + final Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + } // restart the broker restartServerWithSuspension(Duration.ofSeconds(2)); // reconnect subscriber subscriber = createNonCleanStartClient("subscriber"); - - // verify after restart the shared subscription becomes again active - final Mqtt5BlockingClient publisherClientReconnected = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClientReconnected, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + // verify after restart the shared subscription becomes again active + final Mqtt5BlockingClient publisherClientReconnected = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { + // push a message to the shared subscription + publish(publisherClientReconnected, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + } } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java index cdca14864..4c36f34d2 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java @@ -54,9 +54,9 @@ static class PublishCollector implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - latch.countDown(); receivedTopic = topic; receivedMessage = message; + latch.countDown(); } public String receivedPayload() { diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java index 6176df14d..88e59749b 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java @@ -29,7 +29,7 @@ public void givenNonSharedSubscriptionWithIdentifierWhenPublishMatchedThenReceiv // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("/metrics/measures/temp", - MqttQoS.AT_LEAST_ONCE, 123, 400, TimeUnit.MILLISECONDS); + MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(400)); verifyOfType(received, MqttMessageType.SUBACK); Mqtt5BlockingClient publisher = createPublisherClient(); diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index e9bba018e..dc03f4899 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -16,12 +16,32 @@ package io.moquette.testclient; import io.moquette.BrokerConstants; +import io.moquette.broker.metrics.MQTTMessageLogger; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttVersion; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +50,6 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -51,13 +70,12 @@ public interface ICallback { private static final Logger LOG = LoggerFactory.getLogger(Client.class); - private static final Duration TIMEOUT_DURATION = Duration.ofMillis(500); + private static final Duration TIMEOUT_DURATION = Duration.ofMillis(1000); final ClientNettyMQTTHandler handler = new ClientNettyMQTTHandler(); EventLoopGroup workerGroup; Channel m_channel; private boolean m_connectionLost; - private ICallback callback; private String clientId; private AtomicReference receivedMsg = new AtomicReference<>(); private final BlockingQueue receivedMessages = new LinkedBlockingQueue<>(); @@ -82,6 +100,7 @@ public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("rawcli_decoder", new MqttDecoder()); pipeline.addLast("rawcli_encoder", MqttEncoder.INSTANCE); + pipeline.addLast("messageLogger", new MQTTMessageLogger()); pipeline.addLast("rawcli_handler", handler); } }); @@ -99,7 +118,7 @@ public Client clientId(String clientId) { return this; } - public void connect(String willTestamentTopic, String willTestamentMsg) { + public void connect(String willTestamentTopic, String willTestamentMsg) throws InterruptedException { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNECT, false, @@ -130,24 +149,24 @@ public void connect(String willTestamentTopic, String willTestamentMsg) { doConnect(connectMessage); } - public void connect() { + public MqttConnAckMessage connect() throws InterruptedException { MqttConnectMessage connectMessage = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_3_1_1) .clientId("").keepAlive(2) // secs .willFlag(false).willQoS(MqttQoS.AT_MOST_ONCE).build(); - doConnect(connectMessage); + return doConnect(connectMessage); } - public MqttConnAckMessage connectV5() { + public MqttConnAckMessage connectV5() throws InterruptedException { return connectV5(2, BrokerConstants.INFLIGHT_WINDOW_SIZE); } - public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) { + public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) throws InterruptedException { return connectV5(2, receiveMaximumInflight); } @NotNull - public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) { + public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) throws InterruptedException { final MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5); if (clientId != null) { builder.clientId(clientId); @@ -169,31 +188,14 @@ public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInfligh return doConnect(connectMessage); } - private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) { - final CountDownLatch latch = new CountDownLatch(1); - this.setCallback(msg -> { - receivedMsg.getAndSet(msg); - LOG.info("Connect callback invocation, received message {}", msg.fixedHeader().messageType()); - latch.countDown(); - - // clear the callback - setCallback(null); - }); + private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) throws InterruptedException { this.sendMessage(connectMessage); - boolean waitElapsed; - try { - waitElapsed = !latch.await(2_000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting", e); - } - - if (waitElapsed) { + final MqttMessage connAckMessage = this.receiveNextMessage(Duration.ofMillis(2_000)); + if (connAckMessage == null) { throw new RuntimeException("Cannot receive ConnAck in 2 s"); } - - final MqttMessage connAckMessage = this.receivedMsg.get(); if (!(connAckMessage instanceof MqttConnAckMessage)) { MqttMessageType messageType = connAckMessage.fixedHeader().messageType(); throw new RuntimeException("Expected a CONN_ACK message but received " + messageType); @@ -201,32 +203,32 @@ private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) { return (MqttConnAckMessage) connAckMessage; } - public MqttSubAckMessage subscribe(String topic1, MqttQoS qos1, String topic2, MqttQoS qos2) { + public MqttSubAckMessage subscribe(String topic1, MqttQoS qos1, String topic2, MqttQoS qos2) throws InterruptedException { final MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe() .messageId(1) .addSubscription(qos1, topic1) .addSubscription(qos2, topic2) .build(); - return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); + return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION); } - public MqttSubAckMessage subscribe(String topic, MqttQoS qos) { + public MqttSubAckMessage subscribe(String topic, MqttQoS qos) throws InterruptedException { final MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe() .messageId(1) .addSubscription(qos, topic) .build(); - return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); + return doSubscribeWithAckCasting(subscribeMessage, TIMEOUT_DURATION); } - public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier) { - return subscribeWithIdentifier(topic, qos, subscriptionIdentifier, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); + public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier) throws InterruptedException { + return subscribeWithIdentifier(topic, qos, subscriptionIdentifier, TIMEOUT_DURATION); } @NotNull public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier, - long timeout, TimeUnit timeUnit) { + Duration timeout) throws InterruptedException { MqttProperties subProps = new MqttProperties(); subProps.add(new MqttProperties.IntegerProperty( MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), @@ -238,14 +240,14 @@ public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int .properties(subProps) .build(); - return doSubscribeWithAckCasting(subscribeMessage, timeout, timeUnit); + return doSubscribeWithAckCasting(subscribeMessage, timeout); } @NotNull - private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscribeMessage, long timeout, TimeUnit timeUnit) { - doSubscribe(subscribeMessage, timeout, timeUnit); + private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscribeMessage, Duration timeout) throws InterruptedException { + doSubscribe(subscribeMessage); - final MqttMessage subAckMessage = this.receivedMsg.get(); + final MqttMessage subAckMessage = this.receiveNextMessage(timeout); if (!(subAckMessage instanceof MqttSubAckMessage)) { MqttMessageType messageType = subAckMessage.fixedHeader().messageType(); throw new RuntimeException("Expected a SUB_ACK message but received " + messageType); @@ -253,58 +255,16 @@ private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscri return (MqttSubAckMessage) subAckMessage; } - private void doSubscribe(MqttSubscribeMessage subscribeMessage, long timeout, TimeUnit timeUnit) { - final CountDownLatch subscribeAckLatch = new CountDownLatch(1); - this.setCallback(msg -> { - receivedMsg.getAndSet(msg); - LOG.debug("Subscribe callback invocation, received message {}", msg.fixedHeader().messageType()); - subscribeAckLatch.countDown(); - - // clear the callback - setCallback(null); - }); - + private void doSubscribe(MqttSubscribeMessage subscribeMessage) { LOG.debug("Sending SUBSCRIBE message"); sendMessage(subscribeMessage); LOG.debug("Sent SUBSCRIBE message"); - - boolean waitElapsed; - try { - waitElapsed = !subscribeAckLatch.await(timeout, timeUnit); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting", e); - } - - if (waitElapsed) { - throw new RuntimeException("Cannot receive SubscribeAck in " + timeout + " " + timeUnit); - } } - public void publish(MqttPublishMessage publishMessage, int timeout, TimeUnit timeUnit) { - final CountDownLatch publishResponseLatch = new CountDownLatch(1); - this.setCallback(msg -> { - receivedMsg.getAndSet(msg); - LOG.debug("Publish callback invocation, received message {}", msg.fixedHeader().messageType()); - publishResponseLatch.countDown(); - - // clear the callback - setCallback(null); - }); - + public void publish(MqttPublishMessage publishMessage) { LOG.debug("Sending PUBLISH message"); sendMessage(publishMessage); LOG.debug("Sent PUBLISH message"); - - boolean notExpired; - try { - notExpired = publishResponseLatch.await(timeout, timeUnit); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting", e); - } - - if (! notExpired) { - throw new RuntimeException("Cannot receive any message after PUBLISH in " + timeout + " " + timeUnit); - } } public MqttMessage subscribeWithError(String topic, MqttQoS qos) { @@ -313,8 +273,16 @@ public MqttMessage subscribeWithError(String topic, MqttQoS qos) { .addSubscription(qos, topic) .build(); - doSubscribe(subscribeMessage, TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); - return this.receivedMsg.get(); + doSubscribe(subscribeMessage); + try { + MqttMessage mqttMessage = this.receiveNextMessage(TIMEOUT_DURATION); + if (mqttMessage == null) { + throw new RuntimeException("Cannot receive SubscribeAck in " + TIMEOUT_DURATION); + } + return mqttMessage; + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting", e); + } } public void disconnect() { @@ -326,10 +294,6 @@ public void shutdownConnection() throws InterruptedException { this.workerGroup.shutdownGracefully().sync(); } - public void setCallback(ICallback callback) { - this.callback = callback; - } - public void sendMessage(MqttMessage msg) { m_channel.writeAndFlush(msg).addListener(FIRE_EXCEPTION_ON_FAILURE); } @@ -339,12 +303,8 @@ public MqttMessage lastReceivedMessage() { } void messageReceived(MqttMessage msg) { - LOG.info("Received message {}", msg); - if (this.callback != null) { - this.callback.call(msg); - } else { - receivedMessages.add(msg); - } + LOG.debug("Received message {}", msg); + receivedMessages.add(msg); } public boolean hasReceivedMessages() {