Skip to content

Commit 90511de

Browse files
committed
When listening for messages on a specific topic, now comparing based on topic filter including wildcards
1 parent 8e7b63f commit 90511de

File tree

4 files changed

+58
-9
lines changed

4 files changed

+58
-9
lines changed

Sources/MQTTNIO/MQTTClient+Concurrency.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,10 @@ extension MQTTClient {
247247
}
248248

249249
/// An async sequence for iterating over received messages from the broker to a specific topic.
250-
/// - Parameter topic: The topic to receive messages for.
251-
public func messages(forTopic topic: String) -> AsyncFilterSequence<AsyncStream<MQTTMessage>> {
250+
/// - Parameter topicFilter: The topic filter to receive messages for.
251+
public func messages(forTopic topicFilter: String) -> AsyncFilterSequence<AsyncStream<MQTTMessage>> {
252252
return messages.filter {
253-
$0.topic == topic
253+
$0.topic.matchesMqttTopicFilter(topicFilter)
254254
}
255255
}
256256

Sources/MQTTNIO/MQTTClient.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -555,13 +555,13 @@ public class MQTTClient: MQTTConnectionDelegate, MQTTSubscriptionsHandlerDelegat
555555

556556
/// Adds an observer callback which will be called when the client has received an `MQTTMessage` for a specific topic.
557557
/// - Parameters:
558-
/// - topic: The topic to receive messages for.
558+
/// - topicFilter: The topic filter to receive messages for.
559559
/// - callback: The observer callback to add which will be called with the received message.
560560
/// - Returns: An `MQTTCancellable` which can be used to cancel the observer callback.
561561
@discardableResult
562-
public func whenMessage(forTopic topic: String, _ callback: @escaping (_ message: MQTTMessage) -> Void) -> MQTTCancellable {
562+
public func whenMessage(forTopic topicFilter: String, _ callback: @escaping (_ message: MQTTMessage) -> Void) -> MQTTCancellable {
563563
return messageCallbacks.append { message in
564-
guard message.topic == topic else {
564+
guard message.topic.matchesMqttTopicFilter(topicFilter) else {
565565
return
566566
}
567567
callback(message)
@@ -628,13 +628,13 @@ public class MQTTClient: MQTTConnectionDelegate, MQTTSubscriptionsHandlerDelegat
628628
}
629629

630630
/// Returns a publisher for receiving MQTT messages to a specific topic.
631-
/// - Parameter topic: The topic to receive messages for.
631+
/// - Parameter topicFilter: The topic filter to receive messages for.
632632
///
633633
/// This is only available on platforms where `Combine` is available.
634634
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
635-
public func messagePublisher(forTopic topic: String) -> AnyPublisher<MQTTMessage, Never> {
635+
public func messagePublisher(forTopic topicFilter: String) -> AnyPublisher<MQTTMessage, Never> {
636636
return messageSubject
637-
.filter { $0.topic == topic }
637+
.filter { $0.topic.matchesMqttTopicFilter(topicFilter) }
638638
.eraseToAnyPublisher()
639639
}
640640

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
extension String {
2+
3+
/// Indicates whether this string matches a given topic filter.
4+
///
5+
/// This function takes into account wildcards (+ and #) in the topic filter when determining if the string matches.
6+
/// - Parameter topicFilter: The topic filter to match this string with.
7+
/// - Returns: `true` if the string matches the topic filter, `false` otherwise.
8+
public func matchesMqttTopicFilter(_ topicFilter: String) -> Bool {
9+
var filterParts = topicFilter.split(separator: "/", omittingEmptySubsequences: false)
10+
var topicParts = split(separator: "/", omittingEmptySubsequences: false)
11+
12+
while !filterParts.isEmpty && !topicParts.isEmpty {
13+
guard filterParts[0] == topicParts[0] || filterParts[0] == "+" else {
14+
return filterParts.count == 1 && filterParts[0] == "#"
15+
}
16+
17+
filterParts.removeFirst()
18+
topicParts.removeFirst()
19+
}
20+
21+
return filterParts.isEmpty && topicParts.isEmpty
22+
}
23+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
@testable import MQTTNIO
2+
import XCTest
3+
4+
final class TopicFilterMatchTests: XCTestCase {
5+
func testTopicFilterMatches() {
6+
XCTAssertTrue("one/two/three".matchesMqttTopicFilter("one/two/three"))
7+
XCTAssertTrue("one/two/three".matchesMqttTopicFilter("one/+/three"))
8+
XCTAssertTrue("one/two/three".matchesMqttTopicFilter("one/#"))
9+
10+
XCTAssertFalse("/one/two/three".matchesMqttTopicFilter("one/two/three"))
11+
XCTAssertTrue("/one/two/three".matchesMqttTopicFilter("/one/two/three"))
12+
13+
XCTAssertTrue("one/two/three/four/five/six".matchesMqttTopicFilter("one/two/#"))
14+
XCTAssertTrue("one/two/three/four/five/six".matchesMqttTopicFilter("one/+/three/#"))
15+
16+
XCTAssertTrue("one/two/three/four/five/six".matchesMqttTopicFilter("one/+/three/#"))
17+
XCTAssertTrue("one/two/three/four/five/six".matchesMqttTopicFilter("one/+/three/#"))
18+
XCTAssertTrue("one/two/three/four/five/six".matchesMqttTopicFilter("one/two/+/four/+/six"))
19+
20+
XCTAssertFalse("one/two/three".matchesMqttTopicFilter("one/two"))
21+
XCTAssertFalse("one/two/three".matchesMqttTopicFilter("one/+"))
22+
XCTAssertFalse("one/two/three".matchesMqttTopicFilter("one/two/three/four"))
23+
XCTAssertFalse("one/two/three".matchesMqttTopicFilter("one/two/three/#"))
24+
XCTAssertFalse("one/two/three".matchesMqttTopicFilter("one/two/three/+"))
25+
}
26+
}

0 commit comments

Comments
 (0)