diff --git a/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp b/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp index 1374750..121c624 100644 --- a/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp +++ b/vda5050_core/include/vda5050_core/mqtt_client/mqtt_client_interface.hpp @@ -39,13 +39,17 @@ class MqttClientInterface /// \brief Disconnect from the the MQTT broker virtual void disconnect() = 0; + /// \brief Check MQTT connection + virtual bool connected() = 0; + /// \brief Publish a message to the MQTT broker /// /// \param topic Topic for publish /// \param message Raw message string /// \param qos Quality of service setting for the publish virtual void publish( - const std::string& topic, const std::string& message, int qos) = 0; + const std::string& topic, const std::string& message, int qos, + bool retain = false) = 0; using MessageHandler = std::function; @@ -62,6 +66,14 @@ class MqttClientInterface /// /// \param topic Topic to unsubscribe from virtual void unsubscribe(const std::string& topic) = 0; + + /// \brief Set a will message for when the client disconnects abruptly + /// + /// \param topic Topic to publish will message + /// \param message Raw message string + /// \param Quality of service setting for the publish + virtual void set_will( + const std::string& topic, const std::string& message, int qos) = 0; }; /// \brief Create a default MQTT client interface diff --git a/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp b/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp index d61ac71..54f1d1d 100644 --- a/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp +++ b/vda5050_core/include/vda5050_core/mqtt_client/paho_mqtt_client.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -104,9 +105,13 @@ class PahoMqttClient : public MqttClientInterface // Documentation inherited from MqttClientInterface void disconnect() override; + // Documentation inherited from MqttClientInterface + bool connected() override; + // Documentation inherited from MqttClientInterface void publish( - const std::string& topic, const std::string& message, int qos) override; + const std::string& topic, const std::string& message, int qos, + bool retain = false) override; // Documentation inherited from MqttClientInterface void subscribe( @@ -115,6 +120,10 @@ class PahoMqttClient : public MqttClientInterface // Documentation inherited from MqttClientInterface void unsubscribe(const std::string& topic) override; + // Documentation inherited from MqttClientInterface + void set_will( + const std::string& topic, const std::string& message, int qos) override; + friend class MqttCallback; private: @@ -139,6 +148,9 @@ class PahoMqttClient : public MqttClientInterface /// \brief Mutex protecting list of message handlers std::mutex handler_mutex_; + + /// \brief MQTT connection options + mqtt::connect_options conn_options_; }; } // namespace mqtt_client diff --git a/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp b/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp index 6384326..d69465e 100644 --- a/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp +++ b/vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp @@ -109,15 +109,7 @@ void PahoMqttClient::connect() try { - mqtt::connect_options conn_options; - conn_options.set_mqtt_version(4); - conn_options.set_clean_session(false); - conn_options.set_user_name(""); - conn_options.set_password(""); - conn_options.set_automatic_reconnect(true); - conn_options.set_automatic_reconnect(2, 32); - - client_->connect(conn_options, nullptr, action_listener_)->wait(); + client_->connect(conn_options_, nullptr, action_listener_)->wait(); } catch (const mqtt::exception& e) { @@ -144,9 +136,15 @@ void PahoMqttClient::disconnect() } } +//============================================================================= +bool PahoMqttClient::connected() +{ + return client_->is_connected(); +} + //============================================================================= void PahoMqttClient::publish( - const std::string& topic, const std::string& message, int qos) + const std::string& topic, const std::string& message, int qos, bool retain) { try { @@ -154,6 +152,7 @@ void PahoMqttClient::publish( msg->set_topic(topic); msg->set_payload(message); msg->set_qos(qos); + msg->set_retained(retain); client_->publish(msg)->wait(); } @@ -194,6 +193,19 @@ void PahoMqttClient::unsubscribe(const std::string& topic) } } +//============================================================================= +void PahoMqttClient::set_will( + const std::string& topic, const std::string& message, int qos) +{ + mqtt::will_options will; + will.set_topic(topic); + will.set_retained(true); + will.set_qos(qos); + will.set_payload(message); + + conn_options_.set_will(will); +} + //============================================================================= PahoMqttClient::PahoMqttClient( const std::string& broker_address, const std::string& client_id) @@ -202,6 +214,13 @@ PahoMqttClient::PahoMqttClient( callback_(MqttCallback(*this)) { client_->set_callback(callback_); + + conn_options_.set_mqtt_version(4); + conn_options_.set_clean_session(false); + conn_options_.set_user_name(""); + conn_options_.set_password(""); + conn_options_.set_automatic_reconnect(true); + conn_options_.set_automatic_reconnect(2, 32); } } // namespace mqtt_client diff --git a/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp b/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp index 33c66d5..5ae4892 100644 --- a/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp +++ b/vda5050_core/test/integration/mqtt_client/test_paho_mqtt_client.cpp @@ -35,6 +35,7 @@ TEST(PahoMqttClientTest, PublishSubscribe) auto listener = vda5050_core::mqtt_client::create_default_client(broker, "listener"); ASSERT_NO_THROW(listener->connect()); + ASSERT_TRUE(listener->connected()); ASSERT_NO_THROW(listener->subscribe( topic, [&](const std::string& topic_, const std::string& payload_) { diff --git a/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp b/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp index 6b84754..a45ed77 100644 --- a/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp +++ b/vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp @@ -25,14 +25,18 @@ class MockMqttClient : public vda5050_core::mqtt_client::MqttClientInterface public: MOCK_METHOD(void, connect, (), (override)); MOCK_METHOD(void, disconnect, (), (override)); + MOCK_METHOD(bool, connected, (), (override)); MOCK_METHOD( - void, publish, (const std::string&, const std::string&, int), (override)); + void, publish, (const std::string&, const std::string&, int, bool), + (override)); MOCK_METHOD( void, subscribe, (const std::string&, std::function, int), (override)); MOCK_METHOD(void, unsubscribe, (const std::string&), (override)); + MOCK_METHOD( + void, set_will, (const std::string&, const std::string&, int), (override)); }; TEST(MqttClientInterfaceTest, ConnectCall) @@ -49,11 +53,18 @@ TEST(MqttClientInterfaceTest, DisconnectCall) mock.disconnect(); } +TEST(MqttClientInterfaceTest, CheckConnection) +{ + MockMqttClient mock; + EXPECT_CALL(mock, connected()).Times(1); + mock.connected(); +} + TEST(MqttClientInterfaceTest, PublishMessage) { MockMqttClient mock; - EXPECT_CALL(mock, publish("topic", "{payload: 'data'}", 0)).Times(1); - mock.publish("topic", "{payload: 'data'}", 0); + EXPECT_CALL(mock, publish("topic", "{payload: 'data'}", 0, false)).Times(1); + mock.publish("topic", "{payload: 'data'}", 0, false); } TEST(MqttClientInterfaceTest, SubscribeTopic) @@ -70,3 +81,10 @@ TEST(MqttClientInterfaceTest, UnsubscribeTopic) EXPECT_CALL(mock, unsubscribe("topic")).Times(1); mock.unsubscribe("topic"); } + +TEST(MqttClientInterfaceTest, SetWill) +{ + MockMqttClient mock; + EXPECT_CALL(mock, set_will("topic", "{payload: 'data'}", 1)).Times(1); + mock.set_will("topic", "{payload: 'data'}", 1); +}