Skip to content

Commit

Permalink
Fix subscription identifier out of bounds of mqtt5 specification
Browse files Browse the repository at this point in the history
Signed-off-by: Gary Tse <[email protected]>
  • Loading branch information
garytse committed Feb 12, 2025
1 parent bd291f1 commit 848559c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.eclipse.paho.mqttv5.client.internal;

import org.junit.Test;

import static org.junit.Assert.*;

public class MqttSessionStateTest {

@Test
public void testClearSessionState() {
MqttSessionState state = new MqttSessionState();
state.clearSessionState();
assertTrue("Clear session state resets subscription identifier", 1 == state.getNextSubscriptionIdentifier());
}

/**
* Test that the subscription identifier is bounded between 1 and 268,435,455
*/
@Test
public void testSubscriptionIdIsBounded() {
MqttSessionState state = new MqttSessionState();
for (int i = 1; i <= 268_435_456; i++) {
assertTrue("Subscription identifier minimum bound", state.getNextSubscriptionIdentifier()>=1);
assertTrue("Subscription identifier maximum bound", state.getNextSubscriptionIdentifier()<=268_435_455);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,34 @@
* <li>Next Subscription Identifier - The next subscription Identifier available
* to use.</li>
* </ul>
*
* Subscription identifier can take values from 1 to 268,435,455 according to MQTTv5 specification
* @see <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117">MQTTv5 Specification 3.8.2.1.2 Subscription Identifier</a>
*/
public class MqttSessionState {

// ******* Session Specific Properties and counters ******//
private AtomicInteger nextSubscriptionIdentifier = new AtomicInteger(1);
private String clientId;
private Integer SUBSCRIPTION_IDENTIFIER_MAX_LIMIT = 268_435_455;

public void clearSessionState() {
nextSubscriptionIdentifier.set(1);
}

public Integer getNextSubscriptionIdentifier() {
Integer nextValue = nextSubscriptionIdentifier.getAndIncrement();
if (nextValue <= SUBSCRIPTION_IDENTIFIER_MAX_LIMIT) {
return nextValue;
}

// nextValue > SUBSCRIPTION_IDENTIFIER_MAX_LIMIT, so we need to restart the identifier from 1
synchronized(nextSubscriptionIdentifier) {
// read again to make sure no other thread has updated the value
if (nextSubscriptionIdentifier.get() > SUBSCRIPTION_IDENTIFIER_MAX_LIMIT) {
clearSessionState();
}
}
return nextSubscriptionIdentifier.getAndIncrement();
}

Expand Down

0 comments on commit 848559c

Please sign in to comment.