Skip to content

Commit

Permalink
Don't ignore exceptions from tasks
Browse files Browse the repository at this point in the history
Closes #847

Signed-off-by: Alexander Schwartz <[email protected]>
  • Loading branch information
ahus1 committed Jun 13, 2024
1 parent b0a408a commit 6255163
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.keycloak.benchmark.dataset;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -40,7 +40,7 @@ public class ExecutorHelper {
private final ExecutorService executor;
private final KeycloakSessionFactory sessionFactory;
private final DatasetConfig config;
private final Queue<Future> futures = new LinkedList<>();
private final Queue<Future<?>> futures = new LinkedList<>();
protected static final Logger logger = Logger.getLogger(ExecutorHelper.class);

public ExecutorHelper(int threadCount, KeycloakSessionFactory sessionFactory, DatasetConfig config) {
Expand All @@ -50,29 +50,35 @@ public ExecutorHelper(int threadCount, KeycloakSessionFactory sessionFactory, Da
}

public void addTask(Runnable task) {
Future f = executor.submit(task);
Future<?> f = executor.submit(task);
futures.add(f);
}

public void addTaskRunningInTransaction(KeycloakSessionTask sessionTask) {
Future f = executor.submit(() -> KeycloakModelUtils.runJobInTransactionWithTimeout(sessionFactory, sessionTask, config.getTransactionTimeoutInSeconds()));
Future<?> f = executor.submit(() -> KeycloakModelUtils.runJobInTransactionWithTimeout(sessionFactory, sessionTask, config.getTransactionTimeoutInSeconds()));
futures.add(f);
}


public void waitForAllToFinish() throws ExecutionException, InterruptedException {
logger.info("Waiting for tasks to complete");

for (Optional<Future> runningTask = getFirstRunningTask(); runningTask.isPresent(); runningTask = getFirstRunningTask()) {
runningTask.get().get(); // wait for task to complete
public void waitForAllToFinish() {
logger.info("Waiting for tasks to complete successfully");
Collection<Throwable> failures = new ConcurrentLinkedQueue<>();
while (!futures.isEmpty()) {
// some tasks will spawn other tasks, so expect the futures queue to be modified concurrently.
try {
futures.poll().get();
} catch (Throwable e) {
failures.add(e);
}
}
if (!failures.isEmpty()) {
RuntimeException ex = new RuntimeException("Some futures failed");
failures.forEach(ex::addSuppressed);
throw ex;
}
logger.info("All tasks finished");
}

private Optional<Future> getFirstRunningTask() {
return futures.stream().filter(f -> !f.isDone()).findFirst();
}

public void shutDown() {
executor.shutdown();
}
Expand Down

0 comments on commit 6255163

Please sign in to comment.