diff --git a/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp b/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp index d671b68..1374750 100644 --- a/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp +++ b/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp @@ -57,6 +57,11 @@ class MqttClientInterface /// \param qos Quality of service setting for the subscription virtual void subscribe( const std::string& topic, MessageHandler handler, int qos) = 0; + + /// \brief Unsubscribe from a topic + /// + /// \param topic Topic to unsubscribe from + virtual void unsubscribe(const std::string& topic) = 0; }; /// \brief Create a default MQTT client interface diff --git a/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp b/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp index 72a405d..d61ac71 100644 --- a/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp +++ b/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp @@ -112,6 +112,9 @@ class PahoMqttClient : public MqttClientInterface void subscribe( const std::string& topic, MessageHandler handler, int qos) override; + // Documentation inherited from MqttClientInterface + void unsubscribe(const std::string& topic) override; + friend class MqttCallback; private: diff --git a/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp b/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp index f1c58e4..6384326 100644 --- a/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp +++ b/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp @@ -179,6 +179,21 @@ void PahoMqttClient::subscribe( } } +//============================================================================= +void PahoMqttClient::unsubscribe(const std::string& topic) +{ + try + { + client_->unsubscribe(topic)->wait(); + std::lock_guard lock(handler_mutex_); + handlers_.erase(topic); + } + catch (const mqtt::exception& e) + { + VDA5050_ERROR_STREAM("MQTT unsubscription failed: " << e.get_message()); + } +} + //============================================================================= PahoMqttClient::PahoMqttClient( const std::string& broker_address, const std::string& client_id) diff --git a/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp b/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp index d36f8e9..33c66d5 100644 --- a/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp +++ b/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp @@ -56,3 +56,46 @@ TEST(PahoMqttClientTest, PublishSubscribe) ASSERT_NO_THROW(talker->disconnect()); ASSERT_NO_THROW(listener->disconnect()); } + +TEST(PahoMqttClientTest, UnsubscribeStopsMessages) +{ + std::string broker = "tcp://test.mosquitto.org:1883"; + std::string topic = "/test/integration/unsubscribe"; + std::string payload = "hello"; + int qos = 0; + + std::atomic_int message_count{0}; + + auto listener = + vda5050_core::mqtt_client::create_default_client(broker, "unsub_listener"); + ASSERT_NO_THROW(listener->connect()); + ASSERT_NO_THROW(listener->subscribe( + topic, + [&](const std::string& /*topic_*/, const std::string& /*payload_*/) { + message_count++; + }, + qos)); + + auto talker = + vda5050_core::mqtt_client::create_default_client(broker, "unsub_talker"); + ASSERT_NO_THROW(talker->connect()); + + // Publish first message and verify it is received + ASSERT_NO_THROW(talker->publish(topic, payload, qos)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + ASSERT_EQ(message_count.load(), 1); + + // Unsubscribe from the topic + ASSERT_NO_THROW(listener->unsubscribe(topic)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Publish second message after unsubscribe + ASSERT_NO_THROW(talker->publish(topic, payload, qos)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + // Message count should still be 1 (no new messages received) + ASSERT_EQ(message_count.load(), 1); + + ASSERT_NO_THROW(talker->disconnect()); + ASSERT_NO_THROW(listener->disconnect()); +} diff --git a/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp b/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp index 556b8ec..6b84754 100644 --- a/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp +++ b/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp @@ -32,6 +32,7 @@ class MockMqttClient : public vda5050_core::mqtt_client::MqttClientInterface (const std::string&, std::function, int), (override)); + MOCK_METHOD(void, unsubscribe, (const std::string&), (override)); }; TEST(MqttClientInterfaceTest, ConnectCall) @@ -62,3 +63,10 @@ TEST(MqttClientInterfaceTest, SubscribeTopic) EXPECT_CALL(mock, subscribe("topic", testing::_, 0)); mock.subscribe("topic", cb, 0); } + +TEST(MqttClientInterfaceTest, UnsubscribeTopic) +{ + MockMqttClient mock; + EXPECT_CALL(mock, unsubscribe("topic")).Times(1); + mock.unsubscribe("topic"); +}