diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 29e9e23ee83..25d086e343c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -268,6 +268,16 @@ private void doReschedule(final String jobId, long executionTime, long nextExecu this.store.store(update); } + private void doSchedule(final List toSchedule) { + for (Closure closure : toSchedule) { + try { + closure.run(); + } catch (final Exception e) { + LOG.warn("Failed to schedule job", e); + } + } + } + private void doRemove(final List toRemove) throws IOException { for (Closure closure : toRemove) { closure.run(); @@ -727,6 +737,7 @@ protected void mainLoop() { // needed before firing the job event. List toRemove = new ArrayList<>(); List toReschedule = new ArrayList<>(); + List toSchedule = new ArrayList<>(); try { this.store.readLockIndex(); @@ -776,12 +787,18 @@ protected void mainLoop() { // we have a separate schedule to run at this time // so the cron job is used to set of a separate schedule // hence we won't fire the original cron job to the - // listeners but we do need to start a separate schedule - String jobId = ID_GENERATOR.generateId(); - ByteSequence payload = getPayload(job.getLocation()); - schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); - waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); - this.scheduleTime.setWaitTime(waitTime); + // listeners, but we do need to start a separate schedule + toSchedule.add(() -> { + try { + String jobId = ID_GENERATOR.generateId(); + ByteSequence payload = getPayload(job.getLocation()); + schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); + } catch (Exception e) { + LOG.warn("Failed to schedule cron follow-up job", e); + } + }); + long wait = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); + this.scheduleTime.setWaitTime(wait); } } else { toRemove.add(() -> doRemove(executionTime, job.getJobId())); @@ -797,6 +814,10 @@ protected void mainLoop() { } finally { this.store.readUnlockIndex(); + // deferred execution of all jobs to be scheduled to avoid deadlock with indexLock + doSchedule(toSchedule); + + // now reschedule all jobs that need rescheduling doReschedule(toReschedule); // now remove all jobs that have not been rescheduled, @@ -805,6 +826,7 @@ protected void mainLoop() { } this.scheduleTime.pause(); + } catch (Exception ioe) { LOG.error("{} Failed to schedule job", this.name, ioe); try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java index 773713d7d5f..4e1a1ce5c5c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java @@ -16,14 +16,6 @@ */ package org.apache.activemq.broker.scheduler; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import jakarta.jms.Connection; import jakarta.jms.JMSException; import jakarta.jms.Message; @@ -32,7 +24,6 @@ import jakarta.jms.MessageProducer; import jakarta.jms.Session; import jakarta.jms.TextMessage; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.store.kahadb.disk.journal.Location; @@ -48,6 +39,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + public class JmsSchedulerTest extends JobSchedulerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(JmsSchedulerTest.class); @@ -230,6 +229,11 @@ public void append(LogEvent event) { numberOfDiscardedJobs.incrementAndGet(); } } + + @Override + public boolean isStarted() { + return true; // false in DefaultTestAppender so Log4j will discard this appender + } }; registerLogAppender(appender);