Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2986120
AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT
jeanouii Nov 17, 2025
199b69b
AMQ-9808: Consistency and better exception handling
jeanouii Nov 18, 2025
48bb204
AMQ-9809: org.apache.activemq.perf.InactiveDurableTopicTest hanging
jeanouii Nov 18, 2025
00cbd87
AMQ-8525: attempt to improve build time
jeanouii Nov 18, 2025
19c3fa8
AMQ-8525: add missing annotation to previous commit
jeanouii Nov 18, 2025
f0339ed
AMQ-8525: activate the parallel profile by default because CI does no…
jeanouii Nov 18, 2025
9f435a4
AMQ-8525: Small adjustment of parallelism
jeanouii Nov 18, 2025
827007d
AMQ-8525: This one needs some tuning so timeouts don't popup during p…
jeanouii Nov 18, 2025
a09e5b7
AMQ-8525: Improve execution time for CI
jeanouii Nov 18, 2025
db10cab
AMQ-8525: improve stomp module performances
jeanouii Nov 18, 2025
f58b250
AMQ-8525: improve mqtt module performances
jeanouii Nov 18, 2025
03f1174
AMQ-8525: fix parallel execution of MQTT tests
jeanouii Nov 19, 2025
17c1537
AMQ-8525: fix parallel execution of unit tests
jeanouii Nov 19, 2025
f6bbb1e
AMQ-8525: adding a bit more to parallel execution
jeanouii Nov 19, 2025
01c2718
Merge branch 'fix/AMQ-8525_mqtt-module' into fix/AMQ-8525_improve-bui…
jeanouii Nov 20, 2025
99005e7
Merge branch 'fix/AMQ-8525_stomp-module' into fix/AMQ-8525_improve-bu…
jeanouii Nov 20, 2025
ac26b7b
Merge branch 'fix/AMQ-8525_unit-tests-module' into fix/AMQ-8525_impro…
jeanouii Nov 20, 2025
adb55bb
Merge branch 'fix/deadlock-jms-scheduler' into fix/AMQ-8525_improve-b…
jeanouii Nov 20, 2025
2c36355
Merge branch 'fix/InactiveDurableTopicTest-hanging' into fix/AMQ-8525…
jeanouii Nov 20, 2025
d7dd6a4
Merge branch 'fix/TransactedStoreUsageSuspendResumeTest' into fix/AMQ…
jeanouii Nov 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pipeline {
sh 'mvn -version'
// all tests is very very long (10 hours on Apache Jenkins)
// sh 'mvn -B -e test -pl activemq-unit-tests -Dactivemq.tests=all'
sh 'mvn -B -e -fae test -Dsurefire.rerunFailingTestsCount=3'
sh 'mvn -B -e -fae test -Dsurefire.rerunFailingTestsCount=3 -Pall-parallel -Dactivemq.tests=parallel'
}
post {
always {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ private void doReschedule(final String jobId, long executionTime, long nextExecu
this.store.store(update);
}

private void doSchedule(final List<Closure> toSchedule) {
for (Closure closure : toSchedule) {
try {
closure.run();
} catch (final Exception e) {
LOG.warn("Failed to schedule job", e);
}
}
}

private void doRemove(final List<Closure> toRemove) throws IOException {
for (Closure closure : toRemove) {
closure.run();
Expand Down Expand Up @@ -727,6 +737,7 @@ protected void mainLoop() {
// needed before firing the job event.
List<Closure> toRemove = new ArrayList<>();
List<Closure> toReschedule = new ArrayList<>();
List<Closure> toSchedule = new ArrayList<>();
try {
this.store.readLockIndex();

Expand Down Expand Up @@ -776,12 +787,18 @@ protected void mainLoop() {
// we have a separate schedule to run at this time
// so the cron job is used to set of a separate schedule
// hence we won't fire the original cron job to the
// listeners but we do need to start a separate schedule
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = getPayload(job.getLocation());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
this.scheduleTime.setWaitTime(waitTime);
// listeners, but we do need to start a separate schedule
toSchedule.add(() -> {
try {
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = getPayload(job.getLocation());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
} catch (Exception e) {
LOG.warn("Failed to schedule cron follow-up job", e);
}
});
long wait = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
this.scheduleTime.setWaitTime(wait);
}
} else {
toRemove.add(() -> doRemove(executionTime, job.getJobId()));
Expand All @@ -797,6 +814,10 @@ protected void mainLoop() {
} finally {
this.store.readUnlockIndex();

// deferred execution of all jobs to be scheduled to avoid deadlock with indexLock
doSchedule(toSchedule);

// now reschedule all jobs that need rescheduling
doReschedule(toReschedule);

// now remove all jobs that have not been rescheduled,
Expand All @@ -805,6 +826,7 @@ protected void mainLoop() {
}

this.scheduleTime.pause();

} catch (Exception ioe) {
LOG.error("{} Failed to schedule job", this.name, ioe);
try {
Expand Down
100 changes: 82 additions & 18 deletions activemq-mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>activemq-mqtt</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: MQTT Protocol</name>

<properties>
<surefire.version>3.5.3</surefire.version>
</properties>

<description>The ActiveMQ MQTT Protocol Implementation</description>

<dependencies>
Expand Down Expand Up @@ -198,24 +203,6 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
<systemPropertyValues>
<org.apache.activemq.default.directory.prefix>target</org.apache.activemq.default.directory.prefix>
</systemPropertyValues>
<!-- includes>
<include>**/*Test.*</include>
</includes -->
<excludes>
<exclude>**/PahoMQTNioTTest.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
Expand Down Expand Up @@ -293,6 +280,83 @@
</plugins>
</build>
</profile>
<profile>
<id>all-parallel</id>
<activation>
<activeByDefault>true</activeByDefault>
<property>
<name>activemq.tests</name>
<value>parallel</value>
</property>
</activation>
<properties>
<parallel.tests.fork.count>2C</parallel.tests.fork.count>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire.version}</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
<executions>
<execution>
<id>parallel</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration combine.self="override">
<skipTests>false</skipTests>
<forkCount>${parallel.tests.fork.count}</forkCount>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<runOrder>balanced</runOrder>
<failIfNoTests>false</failIfNoTests>
<groups>org.apache.activemq.transport.mqtt.ParallelTest</groups>
<systemPropertyVariables>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/parallel-tests-${surefire.forkNumber}/</org.apache.activemq.default.directory.prefix>
<org.apache.activemq.AutoFailTestSupport.disableSystemExit>true</org.apache.activemq.AutoFailTestSupport.disableSystemExit>
<org.apache.activemq.broker.jmx.createConnector>false</org.apache.activemq.broker.jmx.createConnector>
<!-- when running MQTT tests in parallel in the CI (quite slow) we need to bump the wireformat negotiation timeout (5s by default) -->
<org.apache.activemq.transport.wireFormatNegotiationTimeout>20000</org.apache.activemq.transport.wireFormatNegotiationTimeout>
</systemPropertyVariables>
</configuration>
</execution>
<execution>
<id>serial</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration combine.self="override">
<skipTests>false</skipTests>
<runOrder>balanced</runOrder>
<failIfNoTests>false</failIfNoTests>
<excludedGroups>org.apache.activemq.transport.mqtt.ParallelTest</excludedGroups>
<systemPropertyVariables>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/</org.apache.activemq.default.directory.prefix>
<org.apache.activemq.AutoFailTestSupport.disableSystemExit>true</org.apache.activemq.AutoFailTestSupport.disableSystemExit>
<org.apache.activemq.broker.jmx.createConnector>false</org.apache.activemq.broker.jmx.createConnector>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tests various use cases that require authentication or authorization over MQTT
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTAuthTest extends MQTTAuthTestSupport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tests the functionality of the MQTTCodec class.
*/
@Category(ParallelTest.class)
public class MQTTCodecTest {

private static final Logger LOG = LoggerFactory.getLogger(MQTTCodecTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
import org.apache.activemq.util.ByteSequence;
import org.junit.Test;

import org.junit.experimental.categories.Category;

/**
*
*/
@Category(ParallelTest.class)
public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport {

// configure composite topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.junit.experimental.categories.Category;
/**
* Test that connection attempts that don't send a CONNECT frame will
* get cleaned up by the inactivity monitor.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTConnectTest extends MQTTTestSupport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.junit.experimental.categories.Category;
/**
* Test that the maxFrameSize configuration value is applied across the transports.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTMaxFrameSizeTest extends MQTTTestSupport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport.mqtt;

import org.junit.experimental.categories.Category;

/**
* Run the basic tests with the NIO Transport.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;

import org.junit.experimental.categories.Category;

/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTNIOTest extends MQTTTest {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(ParallelTest.class)
public class MQTTOverlapedSubscriptionsTest {

private BrokerService brokerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.junit.experimental.categories.Category;
/**
* Test to show that a PINGRESP will only be sent for a PINGREQ
* packet after a CONNECT packet has been received.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTPingReqTest extends MQTTTestSupport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/**
* Tests for various usage scenarios of the protocol converter
*/
@Category(ParallelTest.class)
public class MQTTProtocolConverterTest {

private MQTTTransport transport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;

import org.junit.experimental.categories.Category;

/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTSSLTest extends MQTTTest {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.junit.experimental.categories.Category;
/**
* Test that all previous QoS 2 subscriptions are recovered on Broker restart.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
import org.apache.activemq.util.IOHelper;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
Expand All @@ -53,8 +54,6 @@ public class MQTTTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);

public static final String KAHADB_DIRECTORY = "target/activemq-data/";

protected BrokerService brokerService;
protected int port;
protected String jmsUri = "vm://localhost";
Expand Down Expand Up @@ -143,7 +142,7 @@ protected BrokerService createBroker(boolean deleteAllMessages) throws Exception
brokerService.setPersistent(isPersistent());
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
kaha.setDirectory(new File(IOHelper.getDefaultDataDirectory() + "/" + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setAdvisorySupport(advisorySupport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.fusesource.mqtt.client.Topic;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,6 +56,7 @@
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionsTest.class);

Expand Down
Loading