From 024a4037ee66be6650c9b8eae6e9b1b9de524a9d Mon Sep 17 00:00:00 2001 From: Amit Kumar Mondal Date: Tue, 16 Jul 2024 11:17:27 +0200 Subject: [PATCH] Removed use of updating LWT dynamically --- .../mqtt5/provider/MessageClientProvider.java | 58 +++++-------------- .../provider/MessagePublisherProvider.java | 28 --------- 2 files changed, 13 insertions(+), 73 deletions(-) diff --git a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java index 97cc420..65f0021 100644 --- a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java +++ b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java @@ -54,10 +54,8 @@ import org.osgi.service.metatype.annotations.Designate; import org.osgi.service.metatype.annotations.ObjectClassDefinition; -import com.hivemq.client.internal.mqtt.message.publish.MqttWillPublish; import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier; import com.hivemq.client.mqtt.datatypes.MqttQos; -import com.hivemq.client.mqtt.datatypes.MqttUtf8String; import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext; import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener; import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext; @@ -259,7 +257,6 @@ public final class MessageClientProvider { private BundleContext bundleContext; public volatile Config config; - private Mqtt5ClientBuilder clientBuilder; private ServiceRegistration readyServiceReg; @Activate @@ -283,34 +280,8 @@ public synchronized Config config() { return config; } - public void updateLWT(final MqttWillPublish lastWillMessage) { - // disconnect but keep the previous session alive for 10 minutes before reconnection - // previous session is stored to not remove any previous subscriptions - client.disconnectWith() - .reasonCode(NORMAL_DISCONNECTION) - .reasonString("Updated Last will and Testament (LWT) dynamically using publish request message") - .noSessionExpiry() - .send() - .thenAccept(v -> { - initLastWill(lastWillMessage); - connect(); - }); - } - private void init(final Config config) { this.config = config; - final String clientId = getClientID(bundleContext); - - clientBuilder = Mqtt5Client.builder() - .identifier(MqttClientIdentifier.of(clientId)) - .serverHost(config.server()) - .serverPort(config.port()); - - // last will can be configured in two different ways => - // 1. using initial configuration - // 2. client can send a special publish request which will be used as will message - // In case of the second scenario, a reconnection happens - initLastWill(null); try { connect(); } catch (final Exception e) { @@ -346,7 +317,13 @@ private void disconnect(final boolean isNormalDisconnection) { } private void connect() { + final String clientId = getClientID(bundleContext); + final Mqtt5ClientBuilder clientBuilder = Mqtt5Client.builder() + .identifier(MqttClientIdentifier.of(clientId)) + .serverHost(config.server()) + .serverPort(config.port()); final Nested advancedConfig = clientBuilder.advancedConfig(); + initLastWill(clientBuilder); logger.debug( "Adding highest priority connection listeners for (de)/registering MQTT connection ready OSGi service"); @@ -572,7 +549,7 @@ private void connect() { }); } - private void initLastWill(final MqttWillPublish publish) { + private void initLastWill(Mqtt5ClientBuilder clientBuilder) { String topic = null; MqttQos qos = null; byte[] payload = null; @@ -580,21 +557,12 @@ private void initLastWill(final MqttWillPublish publish) { long messageExpiryInterval = 0; long delayInterval = 0; - if (publish == null) { - topic = config.lastWillTopic(); - qos = MqttQos.fromCode(config.lastWillQoS()); - payload = config.lastWillPayLoad().getBytes(); - contentType = config.lastWillContentType(); - messageExpiryInterval = config.lastWillMessageExpiryInterval(); - delayInterval = config.lastWillDelayInterval(); - } else { - topic = publish.getTopic().toString(); - qos = publish.getQos(); - payload = publish.getPayloadAsBytes(); - contentType = publish.getContentType().map(MqttUtf8String::toString).orElse(null); - messageExpiryInterval = publish.getRawMessageExpiryInterval(); - delayInterval = publish.getDelayInterval(); - } + topic = config.lastWillTopic(); + qos = MqttQos.fromCode(config.lastWillQoS()); + payload = config.lastWillPayLoad().getBytes(); + contentType = config.lastWillContentType(); + messageExpiryInterval = config.lastWillMessageExpiryInterval(); + delayInterval = config.lastWillDelayInterval(); if (!topic.isEmpty()) { logger.debug("Applying Last Will and Testament Configuration"); diff --git a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessagePublisherProvider.java b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessagePublisherProvider.java index c9d93d3..cdbc892 100644 --- a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessagePublisherProvider.java +++ b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessagePublisherProvider.java @@ -21,7 +21,6 @@ import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.MESSAGING_ID; import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.MESSAGING_PROTOCOL; import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.ConfigurationPid.PUBLISHER; -import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.LAST_WILL_DELAY_INTERVAL; import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.MESSAGE_EXPIRY_INTERVAL; import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.RETAIN; import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.USER_PROPERTIES; @@ -34,7 +33,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.osgi.service.messaging.Features.EXTENSION_GUARANTEED_DELIVERY; import static org.osgi.service.messaging.Features.EXTENSION_GUARANTEED_ORDERING; -import static org.osgi.service.messaging.Features.EXTENSION_LAST_WILL; import static org.osgi.service.messaging.Features.EXTENSION_QOS; import java.nio.ByteBuffer; @@ -54,7 +52,6 @@ import org.osgi.service.messaging.propertytypes.MessagingFeature; import org.osgi.util.converter.TypeReference; -import com.hivemq.client.internal.mqtt.message.publish.MqttWillPublish; import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; @@ -63,8 +60,6 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder.Send.Complete; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import in.bytehue.messaging.mqtt5.provider.helper.MessageHelper; - //@formatter:off @MessagingFeature( name = MESSAGING_ID, @@ -156,9 +151,6 @@ private void publish(final Message message, MessageContext context, String chann final String contentEncoding = context.getContentEncoding(); - final Object lastWillDelay = extensions.getOrDefault(LAST_WILL_DELAY_INTERVAL, 0L); - final long lastWillDelayInterval = adaptTo(lastWillDelay, long.class, converter); - Mqtt5PayloadFormatIndicator payloadFormat = null; if ("UTF-8".equalsIgnoreCase(contentEncoding)) { payloadFormat = UTF_8; @@ -193,26 +185,6 @@ private void publish(final Message message, MessageContext context, String chann publishRequest.messageExpiryInterval(messageExpiryInterval); } - // check if it is a LWT publish request - final boolean isLwtPublishReq = extensions.containsKey(EXTENSION_LAST_WILL); - if (isLwtPublishReq) { - final MqttWillPublish will = - MessageHelper.toLWT( - channel, - content, - qos, - retain, - messageExpiryInterval, - contentEncoding, - contentType, - replyToChannel, - correlationId, - userProperties, - lastWillDelayInterval); - messagingClient.updateLWT(will); - logger.info("New publish request to udpate LWT has been sent successfully - '{}'", will); - return; - } final CompletableFuture resultFuture = new CompletableFuture<>(); publishRequest.send() .whenComplete((result, throwable) -> {