Skip to content

Commit 4dff880

Browse files
authored
Merge pull request mtconnect#374 from mtconnect/mqtt_last_will_and_client_id_fix
Mqtt last will and client id fix
2 parents 294212c + 8732698 commit 4dff880

11 files changed

+193
-81
lines changed

README.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,13 @@ Sinks {
833833

834834
*Default*: `MTConnect/Current/[device]`
835835

836+
* `MqttLastWillTopic` - The topic used for the last will and testement for an agent
837+
838+
> Note: The value will be `AVAILABLE` when the Agent is publishing and connected and will
839+
> publish `UNAVAILABLE` when the agent disconnects from the broker.
840+
841+
*Default*: `MTConnect/Probe/[device]/Availability"`
842+
836843
* `MqttCurrentInterval` - The frequency to publish currents. Acts like a keyframe in a video stream.
837844

838845
*Default*: 10000ms
@@ -844,7 +851,7 @@ Sinks {
844851
* `MqttSampleCount` - The maxmimum number of observations to publish at one time.
845852

846853
*Default*: 1000
847-
854+
848855
### Adapter Configuration Items ###
849856

850857
* `Adapters` - Adapters begins a list of device blocks. If the Adapters

src/mtconnect/configuration/config_options.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ namespace mtconnect {
9999
DECLARE_CONFIGURATION(MqttUserName);
100100
DECLARE_CONFIGURATION(MqttPassword);
101101
DECLARE_CONFIGURATION(MqttMaxTopicDepth);
102+
DECLARE_CONFIGURATION(MqttLastWillTopic);
102103
///@}
103104

104105
/// @name Adapter Configuration

src/mtconnect/mqtt/mqtt_client.hpp

+10-2
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,14 @@ namespace mtconnect {
4747
/// @param ClientHandler configuration options
4848
/// - ConnectInterval, defaults to 5000
4949

50-
MqttClient(boost::asio::io_context &ioc, std::unique_ptr<ClientHandler> &&handler)
51-
: m_ioContext(ioc), m_handler(std::move(handler)), m_connectInterval(5000)
50+
MqttClient(boost::asio::io_context &ioc, std::unique_ptr<ClientHandler> &&handler,
51+
const std::optional<std::string> willTopic = std::nullopt,
52+
const std::optional<std::string> willPayload = std::nullopt)
53+
: m_ioContext(ioc),
54+
m_handler(std::move(handler)),
55+
m_connectInterval(5000),
56+
m_willTopic(willTopic),
57+
m_willPayload(willPayload)
5258
{}
5359
virtual ~MqttClient() = default;
5460

@@ -101,6 +107,8 @@ namespace mtconnect {
101107
std::string m_identity;
102108
std::unique_ptr<ClientHandler> m_handler;
103109
std::chrono::milliseconds m_connectInterval;
110+
std::optional<std::string> m_willTopic;
111+
std::optional<std::string> m_willPayload;
104112

105113
bool m_running {false};
106114
bool m_connected {false};

src/mtconnect/mqtt/mqtt_client_impl.hpp

+20-6
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
#include <boost/log/trivial.hpp>
2121
#include <boost/uuid/name_generator_sha1.hpp>
2222

23-
#include <inttypes.h>
24-
#include <random>
2523
#include <chrono>
26-
24+
#include <inttypes.h>
2725
#include <mqtt/async_client.hpp>
2826
#include <mqtt/setup_log.hpp>
27+
#include <mqtt/will.hpp>
28+
#include <random>
2929

3030
#include "mqtt_client.hpp"
3131
#include "mtconnect/configuration/config_options.hpp"
@@ -72,8 +72,10 @@ namespace mtconnect {
7272
/// - MqttTls, defaults to false
7373
/// - MqttHost, defaults to LocalHost
7474
MqttClientImpl(boost::asio::io_context &ioContext, const ConfigOptions &options,
75-
std::unique_ptr<ClientHandler> &&handler)
76-
: MqttClient(ioContext, std::move(handler)),
75+
std::unique_ptr<ClientHandler> &&handler,
76+
const std::optional<std::string> willTopic = std::nullopt,
77+
const std::optional<std::string> willPayload = std::nullopt)
78+
: MqttClient(ioContext, std::move(handler), willTopic, willPayload),
7779
m_options(options),
7880
m_host(GetOption<std::string>(options, configuration::MqttHost).value_or("localhost")),
7981
m_port(GetOption<int>(options, configuration::MqttPort).value_or(1883)),
@@ -197,6 +199,19 @@ namespace mtconnect {
197199
}
198200
});
199201

202+
if (m_willTopic && m_willPayload)
203+
{
204+
uint32_t will_expiry_interval = 1;
205+
MQTT_NS::v5::properties ps {
206+
MQTT_NS::v5::property::message_expiry_interval(will_expiry_interval),
207+
};
208+
209+
mqtt::buffer topic(std::string_view(m_willTopic->c_str()));
210+
mqtt::buffer payload(std::string_view(m_willPayload->c_str()));
211+
212+
client->set_will(mqtt::will(topic, payload, mqtt::retain::yes, mqtt::force_move(ps)));
213+
}
214+
200215
m_running = true;
201216
connect();
202217

@@ -383,7 +398,6 @@ namespace mtconnect {
383398
std::uint16_t m_packetId {0};
384399

385400
std::optional<std::string> m_username;
386-
387401
std::optional<std::string> m_password;
388402

389403
boost::asio::steady_timer m_reconnectTimer;

src/mtconnect/mqtt/mqtt_server.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@ namespace mtconnect {
4444
/// @brief Shutdown the Mqtt server
4545
virtual void stop() = 0;
4646

47+
auto &getWill() { return m_will; }
48+
4749
protected:
4850
boost::asio::io_context &m_ioContext;
4951
std::string m_url;
5052
uint16_t m_port;
53+
std::optional<mqtt::will> m_will;
5154
};
5255
} // namespace mqtt_server
5356
} // namespace mtconnect

src/mtconnect/mqtt/mqtt_server_impl.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ namespace mtconnect {
125125
ep.set_connect_handler([this, wp](MQTT_NS::buffer client_id,
126126
MQTT_NS::optional<MQTT_NS::buffer> username,
127127
MQTT_NS::optional<MQTT_NS::buffer> password,
128-
MQTT_NS::optional<MQTT_NS::will>, bool clean_session,
129-
std::uint16_t keep_alive) {
128+
MQTT_NS::optional<MQTT_NS::will> will,
129+
bool clean_session, std::uint16_t keep_alive) {
130130
using namespace MQTT_NS::literals;
131131
LOG(info) << "Server: Client_id : " << client_id << std::endl;
132132
LOG(info) << "Server: User Name : " << (username ? username.value() : "none"_mb)
@@ -141,6 +141,8 @@ namespace mtconnect {
141141
LOG(error) << "Server: Endpoint has been deleted";
142142
return false;
143143
}
144+
if (will)
145+
m_will = will;
144146
m_connections.insert(sp);
145147
sp->connack(false, MQTT_NS::connect_return_code::accepted);
146148
return true;

src/mtconnect/pipeline/json_mapper.cpp

+11-17
Original file line numberDiff line numberDiff line change
@@ -175,20 +175,14 @@ namespace mtconnect::pipeline {
175175
Forward m_forward;
176176
std::list<pair<DataItemPtr, entity::Properties>> m_queue;
177177
};
178-
178+
179179
/// @brief consume value in case of error
180180
struct ErrorHandler : rj::BaseReaderHandler<rj::UTF8<>, ErrorHandler>
181181
{
182182
ErrorHandler(int depth = 0) : m_depth(depth) {}
183-
184-
bool Default()
185-
{
186-
return true;
187-
}
188-
bool Key(const Ch *str, rj::SizeType length, bool copy)
189-
{
190-
return true;
191-
}
183+
184+
bool Default() { return true; }
185+
bool Key(const Ch *str, rj::SizeType length, bool copy) { return true; }
192186
bool StartObject()
193187
{
194188
m_depth++;
@@ -199,21 +193,21 @@ namespace mtconnect::pipeline {
199193
m_depth--;
200194
return true;
201195
}
202-
bool StartArray()
196+
bool StartArray()
203197
{
204198
m_depth++;
205199
return true;
206200
}
207-
bool EndArray(rj::SizeType elementCount)
201+
bool EndArray(rj::SizeType elementCount)
208202
{
209203
m_depth--;
210204
return true;
211205
}
212-
206+
213207
bool operator()(rj::Reader &reader, rj::StringStream &buff)
214208
{
215209
LOG(warning) << "Consuming value due to error";
216-
210+
217211
if (!reader.IterativeParseNext<rj::kParseNanAndInfFlag>(buff, *this))
218212
return false;
219213

@@ -223,7 +217,7 @@ namespace mtconnect::pipeline {
223217
if (!reader.IterativeParseNext<rj::kParseNanAndInfFlag>(buff, *this))
224218
return false;
225219
}
226-
220+
227221
return true;
228222
}
229223

@@ -618,7 +612,7 @@ namespace mtconnect::pipeline {
618612
bool m_object {false};
619613
std::string m_key;
620614
Expectation m_expectation {Expectation::NONE};
621-
int m_depth{0};
615+
int m_depth {0};
622616
};
623617

624618
struct TimestampHandler : rj::BaseReaderHandler<rj::UTF8<>, TimestampHandler>
@@ -862,7 +856,7 @@ namespace mtconnect::pipeline {
862856
m_expectation = Expectation::KEY;
863857
break;
864858
}
865-
859+
866860
case Expectation::VALUE_ERROR:
867861
{
868862
ErrorHandler handler;

src/mtconnect/sink/mqtt_sink/mqtt2_service.cpp

+40-33
Original file line numberDiff line numberDiff line change
@@ -69,27 +69,19 @@ namespace mtconnect {
6969
{configuration::MqttClientId, string()},
7070
{configuration::MqttUserName, string()},
7171
{configuration::MqttPassword, string()}});
72-
AddDefaultedOptions(config, m_options,
73-
{{configuration::MqttHost, "127.0.0.1"s},
74-
{configuration::DeviceTopic, "MTConnect/Probe/[device]"s},
75-
{configuration::AssetTopic, "MTConnect/Asset/[device]"s},
76-
{configuration::CurrentTopic, "MTConnect/Current/[device]"s},
77-
{configuration::SampleTopic, "MTConnect/Sample/[device]"s},
78-
{configuration::MqttCurrentInterval, 10000ms},
79-
{configuration::MqttSampleInterval, 500ms},
80-
{configuration::MqttSampleCount, 1000},
81-
{configuration::MqttPort, 1883},
82-
{configuration::MqttTls, false}});
83-
84-
auto clientHandler = make_unique<ClientHandler>();
85-
clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
86-
// Publish latest devices, assets, and observations
87-
auto &circ = m_sinkContract->getCircularBuffer();
88-
std::lock_guard<buffer::CircularBuffer> lock(circ);
89-
client->connectComplete();
90-
91-
pubishInitialContent();
92-
};
72+
AddDefaultedOptions(
73+
config, m_options,
74+
{{configuration::MqttHost, "127.0.0.1"s},
75+
{configuration::DeviceTopic, "MTConnect/Probe/[device]"s},
76+
{configuration::AssetTopic, "MTConnect/Asset/[device]"s},
77+
{configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s},
78+
{configuration::CurrentTopic, "MTConnect/Current/[device]"s},
79+
{configuration::SampleTopic, "MTConnect/Sample/[device]"s},
80+
{configuration::MqttCurrentInterval, 10000ms},
81+
{configuration::MqttSampleInterval, 500ms},
82+
{configuration::MqttSampleCount, 1000},
83+
{configuration::MqttPort, 1883},
84+
{configuration::MqttTls, false}});
9385

9486
int maxTopicDepth {GetOption<int>(options, configuration::MqttMaxTopicDepth).value_or(7)};
9587

@@ -103,23 +95,38 @@ namespace mtconnect {
10395
m_sampleInterval = *GetOption<Milliseconds>(m_options, configuration::MqttSampleInterval);
10496

10597
m_sampleCount = *GetOption<int>(m_options, configuration::MqttSampleCount);
106-
107-
if (IsOptionSet(m_options, configuration::MqttTls))
108-
{
109-
m_client = make_shared<MqttTlsClient>(m_context, m_options, std::move(clientHandler));
110-
}
111-
else
112-
{
113-
m_client = make_shared<MqttTcpClient>(m_context, m_options, std::move(clientHandler));
114-
}
11598
}
11699

117100
void Mqtt2Service::start()
118101
{
119-
// mqtt client side not a server side...
120102
if (!m_client)
121-
return;
122-
103+
{
104+
auto clientHandler = make_unique<ClientHandler>();
105+
clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
106+
// Publish latest devices, assets, and observations
107+
auto &circ = m_sinkContract->getCircularBuffer();
108+
std::lock_guard<buffer::CircularBuffer> lock(circ);
109+
client->connectComplete();
110+
111+
client->publish(m_lastWillTopic, "AVAILABLE");
112+
pubishInitialContent();
113+
};
114+
115+
auto agentDevice = m_sinkContract->getDeviceByName("Agent");
116+
auto lwtTopic = get<string>(m_options[configuration::MqttLastWillTopic]);
117+
m_lastWillTopic = formatTopic(lwtTopic, agentDevice, "Agent");
118+
119+
if (IsOptionSet(m_options, configuration::MqttTls))
120+
{
121+
m_client = make_shared<MqttTlsClient>(m_context, m_options, std::move(clientHandler),
122+
m_lastWillTopic, "UNAVAILABLE"s);
123+
}
124+
else
125+
{
126+
m_client = make_shared<MqttTcpClient>(m_context, m_options, std::move(clientHandler),
127+
m_lastWillTopic, "UNAVAILABLE"s);
128+
}
129+
}
123130
m_client->start();
124131
}
125132

src/mtconnect/sink/mqtt_sink/mqtt2_service.hpp

+8-6
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,13 @@ namespace mtconnect {
132132
return filter->second;
133133
}
134134

135-
std::string formatTopic(const std::string &topic, const DevicePtr device)
135+
std::string formatTopic(const std::string &topic, const DevicePtr device,
136+
const std::string defaultUuid = "Unknown")
136137
{
137138
string uuid;
138139
string formatted {topic};
139140
if (!device)
140-
uuid = "Unknown";
141+
uuid = defaultUuid;
141142
else
142143
{
143144
uuid = *(device->getUuid());
@@ -173,10 +174,11 @@ namespace mtconnect {
173174
}
174175

175176
protected:
176-
std::string m_deviceTopic; //! Device topic prefix
177-
std::string m_assetTopic; //! Asset topic prefix
178-
std::string m_currentTopic; //! Current topic prefix
179-
std::string m_sampleTopic; //! Sample topic prefix
177+
std::string m_deviceTopic; //! Device topic prefix
178+
std::string m_assetTopic; //! Asset topic prefix
179+
std::string m_currentTopic; //! Current topic prefix
180+
std::string m_sampleTopic; //! Sample topic prefix
181+
std::string m_lastWillTopic; //! Topic to publish the last will when disconnected
180182

181183
std::chrono::milliseconds m_currentInterval; //! Interval in ms to update current
182184
std::chrono::milliseconds m_sampleInterval; //! min interval in ms to update sample

test_package/json_mapping_test.cpp

+4-9
Original file line numberDiff line numberDiff line change
@@ -1009,13 +1009,8 @@ TEST_F(JsonMappingTest, should_parse_xml_asset)
10091009
TEST_F(JsonMappingTest, should_skip_erroneous_values)
10101010
{
10111011
auto dev = makeDevice("Device", {{"id", "device"s}, {"name", "device"s}, {"uuid", "device"s}});
1012-
makeDataItem("device", {{"id", "a"s},
1013-
{"type", "EXECUTION"s},
1014-
{"category", "EVENT"s}});
1015-
makeDataItem("device", {{"id", "b"s},
1016-
{"type", "CONTROLLER_MODE"s},
1017-
{"category", "EVENT"s}});
1018-
1012+
makeDataItem("device", {{"id", "a"s}, {"type", "EXECUTION"s}, {"category", "EVENT"s}});
1013+
makeDataItem("device", {{"id", "b"s}, {"type", "CONTROLLER_MODE"s}, {"category", "EVENT"s}});
10191014

10201015
Properties props {{"VALUE", R"(
10211016
{
@@ -1031,7 +1026,7 @@ TEST_F(JsonMappingTest, should_skip_erroneous_values)
10311026
},
10321027
"b": "MANUAL"
10331028
})"s}};
1034-
1029+
10351030
auto jmsg = std::make_shared<JsonMessage>("JsonMessage", props);
10361031
jmsg->m_device = dev;
10371032

@@ -1049,6 +1044,6 @@ TEST_F(JsonMappingTest, should_skip_erroneous_values)
10491044
ASSERT_EQ("b", obs->getDataItem()->getId());
10501045
ASSERT_EQ("MANUAL", obs->getValue<string>());
10511046
}
1052-
1047+
10531048
/// @test verify the json mapper can an asset in json
10541049
TEST_F(JsonMappingTest, should_parse_json_asset) { GTEST_SKIP(); }

0 commit comments

Comments
 (0)