Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ class MqttClientInterface
/// \brief Disconnect from the the MQTT broker
virtual void disconnect() = 0;

/// \brief Check MQTT connection
virtual bool connected() = 0;

/// \brief Publish a message to the MQTT broker
///
/// \param topic Topic for publish
/// \param message Raw message string
/// \param qos Quality of service setting for the publish
virtual void publish(
const std::string& topic, const std::string& message, int qos) = 0;
const std::string& topic, const std::string& message, int qos,
bool retain = false) = 0;

using MessageHandler =
std::function<void(const std::string&, const std::string&)>;
Expand All @@ -62,6 +66,14 @@ class MqttClientInterface
///
/// \param topic Topic to unsubscribe from
virtual void unsubscribe(const std::string& topic) = 0;

/// \brief Set a will message for when the client disconnects abruptly
///
/// \param topic Topic to publish will message
/// \param message Raw message string
/// \param Quality of service setting for the publish
virtual void set_will(
const std::string& topic, const std::string& message, int qos) = 0;
};

/// \brief Create a default MQTT client interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mqtt/callback.h>
#include <mqtt/connect_options.h>
#include <mqtt/iaction_listener.h>
#include <mqtt/will_options.h>

#include <memory>
#include <mutex>
Expand Down Expand Up @@ -104,9 +105,13 @@ class PahoMqttClient : public MqttClientInterface
// Documentation inherited from MqttClientInterface
void disconnect() override;

// Documentation inherited from MqttClientInterface
bool connected() override;

// Documentation inherited from MqttClientInterface
void publish(
const std::string& topic, const std::string& message, int qos) override;
const std::string& topic, const std::string& message, int qos,
bool retain = false) override;

// Documentation inherited from MqttClientInterface
void subscribe(
Expand All @@ -115,6 +120,10 @@ class PahoMqttClient : public MqttClientInterface
// Documentation inherited from MqttClientInterface
void unsubscribe(const std::string& topic) override;

// Documentation inherited from MqttClientInterface
void set_will(
const std::string& topic, const std::string& message, int qos) override;

friend class MqttCallback;

private:
Expand All @@ -139,6 +148,9 @@ class PahoMqttClient : public MqttClientInterface

/// \brief Mutex protecting list of message handlers
std::mutex handler_mutex_;

/// \brief MQTT connection options
mqtt::connect_options conn_options_;
};

} // namespace mqtt_client
Expand Down
39 changes: 29 additions & 10 deletions vda5050_core/src/vda5050_core/mqtt_client/paho_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,7 @@ void PahoMqttClient::connect()

try
{
mqtt::connect_options conn_options;
conn_options.set_mqtt_version(4);
conn_options.set_clean_session(false);
conn_options.set_user_name("");
conn_options.set_password("");
conn_options.set_automatic_reconnect(true);
conn_options.set_automatic_reconnect(2, 32);

client_->connect(conn_options, nullptr, action_listener_)->wait();
client_->connect(conn_options_, nullptr, action_listener_)->wait();
}
catch (const mqtt::exception& e)
{
Expand All @@ -144,16 +136,23 @@ void PahoMqttClient::disconnect()
}
}

//=============================================================================
bool PahoMqttClient::connected()
{
return client_->is_connected();
}

//=============================================================================
void PahoMqttClient::publish(
const std::string& topic, const std::string& message, int qos)
const std::string& topic, const std::string& message, int qos, bool retain)
{
try
{
auto msg = std::make_shared<mqtt::message>();
msg->set_topic(topic);
msg->set_payload(message);
msg->set_qos(qos);
msg->set_retained(retain);

client_->publish(msg)->wait();
}
Expand Down Expand Up @@ -194,6 +193,19 @@ void PahoMqttClient::unsubscribe(const std::string& topic)
}
}

//=============================================================================
void PahoMqttClient::set_will(
const std::string& topic, const std::string& message, int qos)
{
mqtt::will_options will;
will.set_topic(topic);
will.set_retained(true);
will.set_qos(qos);
will.set_payload(message);

conn_options_.set_will(will);
}

//=============================================================================
PahoMqttClient::PahoMqttClient(
const std::string& broker_address, const std::string& client_id)
Expand All @@ -202,6 +214,13 @@ PahoMqttClient::PahoMqttClient(
callback_(MqttCallback(*this))
{
client_->set_callback(callback_);

conn_options_.set_mqtt_version(4);
conn_options_.set_clean_session(false);
conn_options_.set_user_name("");
conn_options_.set_password("");
conn_options_.set_automatic_reconnect(true);
conn_options_.set_automatic_reconnect(2, 32);
}

} // namespace mqtt_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TEST(PahoMqttClientTest, PublishSubscribe)
auto listener =
vda5050_core::mqtt_client::create_default_client(broker, "listener");
ASSERT_NO_THROW(listener->connect());
ASSERT_TRUE(listener->connected());
ASSERT_NO_THROW(listener->subscribe(
topic,
[&](const std::string& topic_, const std::string& payload_) {
Expand Down
24 changes: 21 additions & 3 deletions vda5050_core/test/unit/mqtt_client/test_mqtt_client_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ class MockMqttClient : public vda5050_core::mqtt_client::MqttClientInterface
public:
MOCK_METHOD(void, connect, (), (override));
MOCK_METHOD(void, disconnect, (), (override));
MOCK_METHOD(bool, connected, (), (override));
MOCK_METHOD(
void, publish, (const std::string&, const std::string&, int), (override));
void, publish, (const std::string&, const std::string&, int, bool),
(override));
MOCK_METHOD(
void, subscribe,
(const std::string&,
std::function<void(const std::string&, const std::string&)>, int),
(override));
MOCK_METHOD(void, unsubscribe, (const std::string&), (override));
MOCK_METHOD(
void, set_will, (const std::string&, const std::string&, int), (override));
};

TEST(MqttClientInterfaceTest, ConnectCall)
Expand All @@ -49,11 +53,18 @@ TEST(MqttClientInterfaceTest, DisconnectCall)
mock.disconnect();
}

TEST(MqttClientInterfaceTest, CheckConnection)
{
MockMqttClient mock;
EXPECT_CALL(mock, connected()).Times(1);
mock.connected();
}

TEST(MqttClientInterfaceTest, PublishMessage)
{
MockMqttClient mock;
EXPECT_CALL(mock, publish("topic", "{payload: 'data'}", 0)).Times(1);
mock.publish("topic", "{payload: 'data'}", 0);
EXPECT_CALL(mock, publish("topic", "{payload: 'data'}", 0, false)).Times(1);
mock.publish("topic", "{payload: 'data'}", 0, false);
}

TEST(MqttClientInterfaceTest, SubscribeTopic)
Expand All @@ -70,3 +81,10 @@ TEST(MqttClientInterfaceTest, UnsubscribeTopic)
EXPECT_CALL(mock, unsubscribe("topic")).Times(1);
mock.unsubscribe("topic");
}

TEST(MqttClientInterfaceTest, SetWill)
{
MockMqttClient mock;
EXPECT_CALL(mock, set_will("topic", "{payload: 'data'}", 1)).Times(1);
mock.set_will("topic", "{payload: 'data'}", 1);
}