Skip to content

Commit

Permalink
Improved job status handling
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 17, 2025
1 parent a0dacfe commit e49aaa4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
Expand Down Expand Up @@ -78,6 +77,10 @@ private class OrderedThreadPoolExecutor {
workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue));
}

public <T, R> void registerCallback(final Class<T> interestType, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestType, callback);
}

@SuppressWarnings("unchecked")
public <T, R> void submit(final Class<T> interestType, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestType,
Expand All @@ -87,20 +90,16 @@ public <T, R> void submit(final Class<T> interestType, final R event) {
handleScheduling(interestType, event.getClass());
}

public <T, R> void registerCallback(final Class<T> interestType, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestType, callback);
}

public <T, R> void handleScheduling(final Class<T> interestType, final Class<R> eventType) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestType, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock());
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock(true));

if (!jobStatus.get()) {
jobLock.lock();

try {
if (jobStatus.compareAndSet(false, true)) {
processEventQueue(interestType, eventType);
executor.submit(() -> processEventQueue(interestType, eventType));
}
} finally {
jobLock.unlock();
Expand All @@ -123,23 +122,22 @@ public <T, R> void processEventQueue(final Class<T> interestType, final Class<R>
throw new NullPointerException("ThreadPoolExecutor in unexpected state.");
}

R newEvent = eventQueue.poll();
jobLock.lock();

if (newEvent != null) {
boolean hasJobStatusUpdated = jobStatus.compareAndSet(true, !eventQueue.isEmpty());
eventCallback.runCallback(newEvent);
while (!eventQueue.isEmpty()) {
R newEvent = eventQueue.poll();

if (!hasJobStatusUpdated) {
if (newEvent != null) {
eventCallback.runCallback(newEvent);
processEventQueue(interestType, eventType);
} else {
jobStatus.set(false);
}
} else {
jobStatus.set(false);
}

jobStatus.set(false);
jobLock.unlock();
}

}

private static final EventBroker INSTANCE;
Expand Down Expand Up @@ -237,7 +235,7 @@ public void onComplete() {
@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisher(final Class<T> eventType) {
return (SubmissionPublisher<T>) publishers.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>(Executors.newSingleThreadExecutor(), Flow.defaultBufferSize()));
key -> new SubmissionPublisher<>());
}

private Class<?> createUniqueSubscriberClassToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ public interface Subscriber<T> {
default Class<T> getEventType() {
Class<?> currentClass = getClass();
while (currentClass != null) {
Type[] interfaces = currentClass.getGenericInterfaces();
for (Type type : interfaces) {
if (type instanceof ParameterizedType) {
ParameterizedType paramType = (ParameterizedType) type;
if (paramType.getRawType() == Subscriber.class) {
return (Class<T>) paramType.getActualTypeArguments()[0];
for (Type type : currentClass.getGenericInterfaces()) {
if (type instanceof ParameterizedType paramType && paramType.getRawType() == Subscriber.class) {
Type typeArg = paramType.getActualTypeArguments()[0];
if (typeArg instanceof Class<?>) {
return (Class<T>) typeArg;
}
throw new IllegalStateException("Generic type parameter is not a Class");
}
}
currentClass = currentClass.getSuperclass();
Expand Down

0 comments on commit e49aaa4

Please sign in to comment.