Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class MqttClientInterface
/// \param qos Quality of service setting for the subscription
virtual void subscribe(
const std::string& topic, MessageHandler handler, int qos) = 0;

/// \brief Unsubscribe from a topic
///
/// \param topic Topic to unsubscribe from
virtual void unsubscribe(const std::string& topic) = 0;
};

/// \brief Create a default MQTT client interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class PahoMqttClient : public MqttClientInterface
void subscribe(
const std::string& topic, MessageHandler handler, int qos) override;

// Documentation inherited from MqttClientInterface
void unsubscribe(const std::string& topic) override;

friend class MqttCallback;

private:
Expand Down
15 changes: 15 additions & 0 deletions vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ void PahoMqttClient::subscribe(
}
}

//=============================================================================
void PahoMqttClient::unsubscribe(const std::string& topic)
{
try
{
client_->unsubscribe(topic)->wait();
std::lock_guard<std::mutex> lock(handler_mutex_);
handlers_.erase(topic);
}
catch (const mqtt::exception& e)
{
VDA5050_ERROR_STREAM("MQTT unsubscription failed: " << e.get_message());
}
}

//=============================================================================
PahoMqttClient::PahoMqttClient(
const std::string& broker_address, const std::string& client_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,46 @@ TEST(PahoMqttClientTest, PublishSubscribe)
ASSERT_NO_THROW(talker->disconnect());
ASSERT_NO_THROW(listener->disconnect());
}

TEST(PahoMqttClientTest, UnsubscribeStopsMessages)
{
std::string broker = "tcp://test.mosquitto.org:1883";
std::string topic = "/test/integration/unsubscribe";
std::string payload = "hello";
int qos = 0;

std::atomic_int message_count{0};

auto listener =
vda5050_core::mqtt_client::create_default_client(broker, "unsub_listener");
ASSERT_NO_THROW(listener->connect());
ASSERT_NO_THROW(listener->subscribe(
topic,
[&](const std::string& /*topic_*/, const std::string& /*payload_*/) {
message_count++;
},
qos));

auto talker =
vda5050_core::mqtt_client::create_default_client(broker, "unsub_talker");
ASSERT_NO_THROW(talker->connect());

// Publish first message and verify it is received
ASSERT_NO_THROW(talker->publish(topic, payload, qos));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
ASSERT_EQ(message_count.load(), 1);

// Unsubscribe from the topic
ASSERT_NO_THROW(listener->unsubscribe(topic));
std::this_thread::sleep_for(std::chrono::milliseconds(500));

// Publish second message after unsubscribe
ASSERT_NO_THROW(talker->publish(topic, payload, qos));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));

// Message count should still be 1 (no new messages received)
ASSERT_EQ(message_count.load(), 1);

ASSERT_NO_THROW(talker->disconnect());
ASSERT_NO_THROW(listener->disconnect());
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class MockMqttClient : public vda5050_core::mqtt_client::MqttClientInterface
(const std::string&,
std::function<void(const std::string&, const std::string&)>, int),
(override));
MOCK_METHOD(void, unsubscribe, (const std::string&), (override));
};

TEST(MqttClientInterfaceTest, ConnectCall)
Expand Down Expand Up @@ -62,3 +63,10 @@ TEST(MqttClientInterfaceTest, SubscribeTopic)
EXPECT_CALL(mock, subscribe("topic", testing::_, 0));
mock.subscribe("topic", cb, 0);
}

TEST(MqttClientInterfaceTest, UnsubscribeTopic)
{
MockMqttClient mock;
EXPECT_CALL(mock, unsubscribe("topic")).Times(1);
mock.unsubscribe("topic");
}