Skip to content

Commit

Permalink
Replaced with cached thread pool with a max size of 3
Browse files Browse the repository at this point in the history
  • Loading branch information
amitjoy committed Nov 14, 2024
1 parent 3ff3a3b commit a425c42
Showing 1 changed file with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.osgi.framework.BundleContext;
Expand All @@ -63,12 +68,15 @@
import in.bytehue.messaging.mqtt5.provider.helper.FilterParser;
import in.bytehue.messaging.mqtt5.provider.helper.FilterParser.Expression;
import in.bytehue.messaging.mqtt5.provider.helper.SubscriptionAck;
import in.bytehue.messaging.mqtt5.provider.helper.ThreadFactoryBuilder;

@Component(configurationPid = PID)
@MessagingFeature(name = MESSAGING_ID, protocol = MESSAGING_PROTOCOL)
public final class MessageReplyToWhiteboardProvider {

public static final String PID = "in.bytehue.messaging.whiteboard";
public static final String THREAD_NAME_PREFIX = "reply-to-handler";
public static final String THREAD_NAME_SUFFIX = "-%d";

@interface Config {
boolean storeReplyToChannelInfoIfReceivedInMessage() default true;
Expand All @@ -93,6 +101,7 @@ public final class MessageReplyToWhiteboardProvider {
private ComponentServiceObjects<MessageContextBuilderProvider> mcbFactory;

private Config config;
private ExecutorService executorService;
private final List<ReplyToSubDTO> subscriptions = new CopyOnWriteArrayList<>();

private ServiceTracker<ReplyToSingleSubscriptionHandler, ReplyToSingleSubscriptionHandler> tracker1;
Expand All @@ -102,6 +111,21 @@ public final class MessageReplyToWhiteboardProvider {
@Activate
void activate(final Config config, final BundleContext context) {
this.config = config;
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setThreadFactoryName(THREAD_NAME_PREFIX)
.setThreadNameFormat(THREAD_NAME_SUFFIX)
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(
0, // core pool size (0 for cached behavior)
3, // maximum pool size
60L, // thread idle time before termination
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory
);

subscriptions.stream().filter(sub -> !sub.isProcessed()).forEach(sub -> {
switch (sub.type) {
case REPLY_TO_SUB:
Expand All @@ -125,7 +149,7 @@ public synchronized ReplyToSingleSubscriptionHandler addingService(
final ReplyToSubDTO sub = new ReplyToSubDTO(handler, REPLY_TO_SINGLE_SUB, reference);
subscriptions.add(sub);

new Thread(() -> processReplyToSingleSubscriptionHandler(sub)).start();
executorService.submit(() -> processReplyToSingleSubscriptionHandler(sub));
return handler;
}

Expand All @@ -152,7 +176,7 @@ public synchronized ReplyToSubscriptionHandler addingService(
final ReplyToSubDTO sub = new ReplyToSubDTO(handler, REPLY_TO_SUB, reference);
subscriptions.add(sub);

new Thread(() -> processReplyToSubscriptionHandler(sub)).start();
executorService.submit(() -> processReplyToSubscriptionHandler(sub));
return handler;
}

Expand All @@ -179,7 +203,7 @@ public synchronized ReplyToManySubscriptionHandler addingService(
final ReplyToSubDTO sub = new ReplyToSubDTO(handler, REPLY_TO_MANY_SUB, reference);
subscriptions.add(sub);

new Thread(() -> processReplyToManySubscriptionHandler(sub)).start();
executorService.submit(() -> processReplyToManySubscriptionHandler(sub));
return handler;
}

Expand Down Expand Up @@ -210,6 +234,8 @@ void deactivate() {
tracker1.close();
tracker2.close();
tracker3.close();

executorService.shutdownNow();
}

@Modified
Expand Down

0 comments on commit a425c42

Please sign in to comment.