Skip to content

Commit

Permalink
Removed use of updating LWT dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
amitjoy committed Jul 16, 2024
1 parent dab924b commit 024a403
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

import com.hivemq.client.internal.mqtt.message.publish.MqttWillPublish;
import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.datatypes.MqttUtf8String;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
Expand Down Expand Up @@ -259,7 +257,6 @@ public final class MessageClientProvider {
private BundleContext bundleContext;

public volatile Config config;
private Mqtt5ClientBuilder clientBuilder;
private ServiceRegistration<Object> readyServiceReg;

@Activate
Expand All @@ -283,34 +280,8 @@ public synchronized Config config() {
return config;
}

public void updateLWT(final MqttWillPublish lastWillMessage) {
// disconnect but keep the previous session alive for 10 minutes before reconnection
// previous session is stored to not remove any previous subscriptions
client.disconnectWith()
.reasonCode(NORMAL_DISCONNECTION)
.reasonString("Updated Last will and Testament (LWT) dynamically using publish request message")
.noSessionExpiry()
.send()
.thenAccept(v -> {
initLastWill(lastWillMessage);
connect();
});
}

private void init(final Config config) {
this.config = config;
final String clientId = getClientID(bundleContext);

clientBuilder = Mqtt5Client.builder()
.identifier(MqttClientIdentifier.of(clientId))
.serverHost(config.server())
.serverPort(config.port());

// last will can be configured in two different ways =>
// 1. using initial configuration
// 2. client can send a special publish request which will be used as will message
// In case of the second scenario, a reconnection happens
initLastWill(null);
try {
connect();
} catch (final Exception e) {
Expand Down Expand Up @@ -346,7 +317,13 @@ private void disconnect(final boolean isNormalDisconnection) {
}

private void connect() {
final String clientId = getClientID(bundleContext);
final Mqtt5ClientBuilder clientBuilder = Mqtt5Client.builder()
.identifier(MqttClientIdentifier.of(clientId))
.serverHost(config.server())
.serverPort(config.port());
final Nested<? extends Mqtt5ClientBuilder> advancedConfig = clientBuilder.advancedConfig();
initLastWill(clientBuilder);

logger.debug(
"Adding highest priority connection listeners for (de)/registering MQTT connection ready OSGi service");
Expand Down Expand Up @@ -572,29 +549,20 @@ private void connect() {
});
}

private void initLastWill(final MqttWillPublish publish) {
private void initLastWill(Mqtt5ClientBuilder clientBuilder) {
String topic = null;
MqttQos qos = null;
byte[] payload = null;
String contentType = null;
long messageExpiryInterval = 0;
long delayInterval = 0;

if (publish == null) {
topic = config.lastWillTopic();
qos = MqttQos.fromCode(config.lastWillQoS());
payload = config.lastWillPayLoad().getBytes();
contentType = config.lastWillContentType();
messageExpiryInterval = config.lastWillMessageExpiryInterval();
delayInterval = config.lastWillDelayInterval();
} else {
topic = publish.getTopic().toString();
qos = publish.getQos();
payload = publish.getPayloadAsBytes();
contentType = publish.getContentType().map(MqttUtf8String::toString).orElse(null);
messageExpiryInterval = publish.getRawMessageExpiryInterval();
delayInterval = publish.getDelayInterval();
}
topic = config.lastWillTopic();
qos = MqttQos.fromCode(config.lastWillQoS());
payload = config.lastWillPayLoad().getBytes();
contentType = config.lastWillContentType();
messageExpiryInterval = config.lastWillMessageExpiryInterval();
delayInterval = config.lastWillDelayInterval();

if (!topic.isEmpty()) {
logger.debug("Applying Last Will and Testament Configuration");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.MESSAGING_ID;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.MESSAGING_PROTOCOL;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.ConfigurationPid.PUBLISHER;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.LAST_WILL_DELAY_INTERVAL;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.MESSAGE_EXPIRY_INTERVAL;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.RETAIN;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.Extension.USER_PROPERTIES;
Expand All @@ -34,7 +33,6 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.osgi.service.messaging.Features.EXTENSION_GUARANTEED_DELIVERY;
import static org.osgi.service.messaging.Features.EXTENSION_GUARANTEED_ORDERING;
import static org.osgi.service.messaging.Features.EXTENSION_LAST_WILL;
import static org.osgi.service.messaging.Features.EXTENSION_QOS;

import java.nio.ByteBuffer;
Expand All @@ -54,7 +52,6 @@
import org.osgi.service.messaging.propertytypes.MessagingFeature;
import org.osgi.util.converter.TypeReference;

import com.hivemq.client.internal.mqtt.message.publish.MqttWillPublish;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
Expand All @@ -63,8 +60,6 @@
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder.Send.Complete;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;

import in.bytehue.messaging.mqtt5.provider.helper.MessageHelper;

//@formatter:off
@MessagingFeature(
name = MESSAGING_ID,
Expand Down Expand Up @@ -156,9 +151,6 @@ private void publish(final Message message, MessageContext context, String chann

final String contentEncoding = context.getContentEncoding();

final Object lastWillDelay = extensions.getOrDefault(LAST_WILL_DELAY_INTERVAL, 0L);
final long lastWillDelayInterval = adaptTo(lastWillDelay, long.class, converter);

Mqtt5PayloadFormatIndicator payloadFormat = null;
if ("UTF-8".equalsIgnoreCase(contentEncoding)) {
payloadFormat = UTF_8;
Expand Down Expand Up @@ -193,26 +185,6 @@ private void publish(final Message message, MessageContext context, String chann
publishRequest.messageExpiryInterval(messageExpiryInterval);
}

// check if it is a LWT publish request
final boolean isLwtPublishReq = extensions.containsKey(EXTENSION_LAST_WILL);
if (isLwtPublishReq) {
final MqttWillPublish will =
MessageHelper.toLWT(
channel,
content,
qos,
retain,
messageExpiryInterval,
contentEncoding,
contentType,
replyToChannel,
correlationId,
userProperties,
lastWillDelayInterval);
messagingClient.updateLWT(will);
logger.info("New publish request to udpate LWT has been sent successfully - '{}'", will);
return;
}
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
publishRequest.send()
.whenComplete((result, throwable) -> {
Expand Down

0 comments on commit 024a403

Please sign in to comment.