Skip to content

Commit

Permalink
Added support of custom executor
Browse files Browse the repository at this point in the history
  • Loading branch information
amitjoy committed Sep 10, 2024
1 parent 024a403 commit 63bfe2d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.MQTT_CONNECTION_READY_SERVICE_PROPERTY;
import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.ConfigurationPid.CLIENT;
import static in.bytehue.messaging.mqtt5.provider.helper.MessageHelper.getOptionalService;
import static in.bytehue.messaging.mqtt5.provider.helper.MessageHelper.getOptionalServiceWithoutType;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.osgi.service.condition.Condition.CONDITION_ID;
import static org.osgi.service.condition.Condition.CONDITION_ID_TRUE;
Expand All @@ -32,6 +33,11 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import javax.net.ssl.HostnameVerifier;
Expand All @@ -54,6 +60,7 @@
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

import com.hivemq.client.internal.netty.NettyEventLoopProvider;
import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
Expand All @@ -75,6 +82,7 @@
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;

import in.bytehue.messaging.mqtt5.provider.MessageClientProvider.Config;
import in.bytehue.messaging.mqtt5.provider.helper.ThreadFactoryBuilder;

@ProvideMessagingFeature
@Designate(ocd = Config.class)
Expand Down Expand Up @@ -143,6 +151,27 @@ public final class MessageClientProvider {
@AttributeDefinition(name = "Simple Authentication Password", type = PASSWORD)
String password() default "";

@AttributeDefinition(name = "Custom Executor Configuration")
boolean useCustomExecutor() default false;

@AttributeDefinition(name = "Custom Executor Number of Threads")
int numberOfThreads() default 5;

@AttributeDefinition(name = "Custom Executor Prefix of the thread name")
String threadNamePrefix() default "osgi-mqtt";

@AttributeDefinition(name = "Custom Executor Suffix of the thread name (supports only {@code %d} format specifier)")
String threadNameSuffix() default "-%d";

@AttributeDefinition(name = "Flag to set if the threads will be daemon threads")
boolean isDaemon() default true;

@AttributeDefinition(name = "Custom Thread Executor Service Class Name (Note that, the service should be an instance of Java Executor)")
String executorTargetClass() default "";

@AttributeDefinition(name = "Custom Thread Executor Service Target Filter")
String executorTargetFilter() default "";

@AttributeDefinition(name = "SSL Configuration")
boolean useSSL() default false;

Expand Down Expand Up @@ -257,6 +286,7 @@ public final class MessageClientProvider {
private BundleContext bundleContext;

public volatile Config config;
private ScheduledExecutorService customExecutor;
private ServiceRegistration<Object> readyServiceReg;

@Activate
Expand All @@ -281,6 +311,7 @@ public synchronized Config config() {
}

private void init(final Config config) {
logger.info("Performing connection");
this.config = config;
try {
connect();
Expand All @@ -290,7 +321,7 @@ private void init(final Config config) {
}

private void disconnect(final boolean isNormalDisconnection) {
logger.info("Performing disconection");
logger.info("Performing disconnection");
Mqtt5DisconnectReasonCode reasonCode;
String reasonDescription;
if (isNormalDisconnection) {
Expand All @@ -314,6 +345,12 @@ private void disconnect(final boolean isNormalDisconnection) {
disconnectParams.noSessionExpiry();
}
disconnectParams.send();
// shutdown the custom executor if used
if (customExecutor != null) {
customExecutor.shutdownNow();
NettyEventLoopProvider.INSTANCE.releaseEventLoop(customExecutor);
customExecutor = null;
}
}

private void connect() {
Expand Down Expand Up @@ -424,6 +461,36 @@ private void connect() {
.orElse(null))
.applySslConfig();
}
if (config.useCustomExecutor()) {
logger.debug("Applying Custom Executor Configuration");
final String clazz = config.executorTargetClass().trim();
if (clazz.isEmpty()) {
logger.debug("Applying Executor as Non-Service Configuration");
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setThreadFactoryName(config.threadNamePrefix())
.setThreadNameFormat(config.threadNameSuffix())
.setDaemon(config.isDaemon())
.build();
customExecutor = Executors.newScheduledThreadPool(config.numberOfThreads(), threadFactory);
((ScheduledThreadPoolExecutor) customExecutor).setRemoveOnCancelPolicy(true);
clientBuilder.executorConfig()
.nettyExecutor(customExecutor)
.applyExecutorConfig();
} else {
logger.debug("Applying Executor as Service Configuration");
String filter = config.executorTargetFilter().trim();
Optional<Object> service =
getOptionalServiceWithoutType(
clazz,
filter,
bundleContext,
logger);
service.ifPresent(executor -> clientBuilder.executorConfig()
.nettyExecutor((Executor) executor)
.applyExecutorConfig());
}
}
if (config.useEnhancedAuthentication()) {
logger.debug("Applying Enhanced Authentication Configuration");
clientBuilder.enhancedAuth(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +81,42 @@ public final class MessageHelper {
private MessageHelper() {
throw new IllegalAccessError("Non-instantiable");
}

public static Object getServiceWithoutType(final String clazz, final String filter, final BundleContext context) {
try {
ServiceReference<?>[] references = context.getServiceReferences(clazz, filter);
if (references == null || references.length == 0) {
throw new RuntimeException("'" + clazz + "' service instance cannot be found");
}

final ToLongFunction<ServiceReference<?>> srFunc =
sr -> Optional.ofNullable(((ServiceReference<?>) sr).getProperty(SERVICE_RANKING))
.filter(Number.class::isInstance)
.map(Number.class::cast)
.map(Number::longValue)
.orElse(0L);

return Arrays.stream(references)
.max(comparingLong(srFunc)) // Finds the reference with the highest ranking
.map(context::getService)
.orElseThrow(() -> new RuntimeException("'" + clazz + "' service instance cannot be found"));
} catch (Exception e) {
throw new RuntimeException("Service '" + clazz + "' cannot be retrieved", e);
}
}

public static Optional<Object> getOptionalServiceWithoutType(final String clazz, String filter, final BundleContext context, final Logger logger) {
try {
if (filter.trim().isEmpty()) {
filter = null;
}
final Object service = getServiceWithoutType(clazz, filter, context);
return Optional.ofNullable(service);
} catch (final Exception e) {
logger.warn("Service '{}' cannot be retrieved", clazz);
return Optional.empty();
}
}

public static <T> T getService(final Class<T> clazz, final String filter, final BundleContext context) {
try {
Expand Down

0 comments on commit 63bfe2d

Please sign in to comment.