From 7b9b2a383f76ed7b7ac8dd2a4c17ad227027fa77 Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Tue, 8 Oct 2024 10:17:20 +0900 Subject: [PATCH 1/2] Fixed TLS async_close timeout logic. --- include/async_mqtt/impl/endpoint_close.hpp | 2 +- .../customized_ssl_stream.hpp | 67 +++++++------------ .../util/impl/stream_read_packet.hpp | 1 + .../util/impl/stream_write_packet.hpp | 1 + include/async_mqtt/util/ioc_queue.hpp | 3 + 5 files changed, 30 insertions(+), 44 deletions(-) diff --git a/include/async_mqtt/impl/endpoint_close.hpp b/include/async_mqtt/impl/endpoint_close.hpp index 24bdea909..0b79e42f4 100644 --- a/include/async_mqtt/impl/endpoint_close.hpp +++ b/include/async_mqtt/impl/endpoint_close.hpp @@ -51,7 +51,7 @@ close_op { ASYNC_MQTT_LOG("mqtt_impl", trace) << ASYNC_MQTT_ADD_VALUE(address, &a_ep) << "already close requested"; - a_ep.close_queue_.post( + a_ep.close_queue_.post( force_move(self) ); } break; diff --git a/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp b/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp index 3a98c479f..0be45bd8a 100644 --- a/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp +++ b/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp @@ -68,60 +68,41 @@ struct layer_customize> { stream.get_executor(), shutdown_timeout ); - auto self_sp = std::make_shared(force_move(self)); + auto sig = std::make_shared(); tim->async_wait( - as::consign( - as::append( - std::ref(*self_sp), - std::weak_ptr(tim) - ), - self_sp - ) + [sig, wp = std::weak_ptr(tim)] + (error_code const& ec) { + if (!ec) { + if (auto sp = wp.lock()) { + ASYNC_MQTT_LOG("mqtt_impl", info) + << "TLS async_shutdown timeout"; + sig->emit(as::cancellation_type::terminal); + } + } + } ); - stream.async_shutdown( - as::consign( - std::ref(*self_sp), - self_sp, - tim + auto& a_stream{stream}; + a_stream.async_shutdown( + as::bind_cancellation_slot( + sig->slot(), + as::consign( + force_move(self), + tim, + sig + ) ) ); } - template - void operator()( - Self& self, - error_code const& ec, - std::weak_ptr wp - ) { - if (!ec) { - if (auto sp = wp.lock()) { - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown timeout"; - BOOST_ASSERT(state == shutdown); - state = complete; - self.complete(ec); - return; - } - } - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown timeout doesn't processed. ec:" << ec.message(); - } - template void operator()( Self& self, error_code const& ec ) { - if (state == complete) { - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown already timeout"; - } - else { - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown ec:" << ec.message(); - state = complete; - self.complete(ec); - } + ASYNC_MQTT_LOG("mqtt_impl", info) + << "TLS async_shutdown ec:" << ec.message(); + state = complete; + self.complete(ec); } }; }; diff --git a/include/async_mqtt/util/impl/stream_read_packet.hpp b/include/async_mqtt/util/impl/stream_read_packet.hpp index 865cff310..7950e63a9 100644 --- a/include/async_mqtt/util/impl/stream_read_packet.hpp +++ b/include/async_mqtt/util/impl/stream_read_packet.hpp @@ -47,6 +47,7 @@ struct stream_impl::stream_read_packet_op { a_strm.read_queue_.post( force_move(self) ); + a_strm.read_queue_.try_execute(); } break; case work: { a_strm.read_queue_.start_work(); diff --git a/include/async_mqtt/util/impl/stream_write_packet.hpp b/include/async_mqtt/util/impl/stream_write_packet.hpp index 66fe29d07..70f0ed207 100644 --- a/include/async_mqtt/util/impl/stream_write_packet.hpp +++ b/include/async_mqtt/util/impl/stream_write_packet.hpp @@ -51,6 +51,7 @@ struct stream_impl::stream_write_packet_op { a_strm.write_queue_.post( force_move(self) ); + a_strm.write_queue_.try_execute(); } break; case write: { a_strm.write_queue_.start_work(); diff --git a/include/async_mqtt/util/ioc_queue.hpp b/include/async_mqtt/util/ioc_queue.hpp index 0684eb729..5632fbbe9 100644 --- a/include/async_mqtt/util/ioc_queue.hpp +++ b/include/async_mqtt/util/ioc_queue.hpp @@ -40,6 +40,9 @@ class ioc_queue { queue_, std::forward(token) ); + } + + void try_execute() { if (immediate_executable()) { queue_.restart(); queue_.poll_one(); From 58696ffa11093585a18944606f4c72ebabc3e9ef Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Tue, 8 Oct 2024 10:44:57 +0900 Subject: [PATCH 2/2] Updated documents. --- CHANGELOG.adoc | 1 + doc/CHANGELOG.html | 3 + .../customized__ssl__stream_8hpp_source.html | 101 +++++++----------- doc/api/ioc__queue_8hpp_source.html | 65 +++++------ 4 files changed, 79 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 3c0658c60..bb7d7ae62 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -3,6 +3,7 @@ = History == 9.0.2 +* Fixed TLS timeout logic. #357 * Fixed broker auth file for docker. #356 == 9.0.1 diff --git a/doc/CHANGELOG.html b/doc/CHANGELOG.html index f1fda449b..060192746 100644 --- a/doc/CHANGELOG.html +++ b/doc/CHANGELOG.html @@ -69,6 +69,9 @@

9.0.2

  • +

    Fixed TLS timeout logic. #357

    +
  • +
  • Fixed broker auth file for docker. #356

diff --git a/doc/api/customized__ssl__stream_8hpp_source.html b/doc/api/customized__ssl__stream_8hpp_source.html index c56aad4e2..ce3940057 100644 --- a/doc/api/customized__ssl__stream_8hpp_source.html +++ b/doc/api/customized__ssl__stream_8hpp_source.html @@ -162,68 +162,49 @@
68 stream.get_executor(),
69 shutdown_timeout
70 );
-
71 auto self_sp = std::make_shared<Self>(force_move(self));
+
71 auto sig = std::make_shared<as::cancellation_signal>();
72 tim->async_wait(
-
73 as::consign(
-
74 as::append(
-
75 std::ref(*self_sp),
-
76 std::weak_ptr<as::steady_timer>(tim)
-
77 ),
-
78 self_sp
-
79 )
-
80 );
-
81 stream.async_shutdown(
-
82 as::consign(
-
83 std::ref(*self_sp),
-
84 self_sp,
-
85 tim
-
86 )
-
87 );
-
88 }
-
89
-
90 template <typename Self>
-
91 void operator()(
-
92 Self& self,
-
93 error_code const& ec,
-
94 std::weak_ptr<as::steady_timer> wp
-
95 ) {
-
96 if (!ec) {
-
97 if (auto sp = wp.lock()) {
-
98 ASYNC_MQTT_LOG("mqtt_impl", info)
-
99 << "TLS async_shutdown timeout";
-
100 BOOST_ASSERT(state == shutdown);
-
101 state = complete;
-
102 self.complete(ec);
-
103 return;
-
104 }
-
105 }
-
106 ASYNC_MQTT_LOG("mqtt_impl", info)
-
107 << "TLS async_shutdown timeout doesn't processed. ec:" << ec.message();
-
108 }
-
109
-
110 template <typename Self>
-
111 void operator()(
-
112 Self& self,
-
113 error_code const& ec
-
114 ) {
-
115 if (state == complete) {
-
116 ASYNC_MQTT_LOG("mqtt_impl", info)
-
117 << "TLS async_shutdown already timeout";
-
118 }
-
119 else {
-
120 ASYNC_MQTT_LOG("mqtt_impl", info)
-
121 << "TLS async_shutdown ec:" << ec.message();
-
122 state = complete;
-
123 self.complete(ec);
-
124 }
-
125 }
-
126 };
-
127};
+
73 [sig, wp = std::weak_ptr<as::steady_timer>(tim)]
+
74 (error_code const& ec) {
+
75 if (!ec) {
+
76 if (auto sp = wp.lock()) {
+
77 ASYNC_MQTT_LOG("mqtt_impl", info)
+
78 << "TLS async_shutdown timeout";
+
79 sig->emit(as::cancellation_type::terminal);
+
80 }
+
81 }
+
82 }
+
83 );
+
84 auto& a_stream{stream};
+
85 a_stream.async_shutdown(
+
86 as::bind_cancellation_slot(
+
87 sig->slot(),
+
88 as::consign(
+
89 force_move(self),
+
90 tim,
+
91 sig
+
92 )
+
93 )
+
94 );
+
95 }
+
96
+
97 template <typename Self>
+
98 void operator()(
+
99 Self& self,
+
100 error_code const& ec
+
101 ) {
+
102 ASYNC_MQTT_LOG("mqtt_impl", info)
+
103 << "TLS async_shutdown ec:" << ec.message();
+
104 state = complete;
+
105 self.complete(ec);
+
106 }
+
107 };
+
108};
-
128
-
129} // namespace async_mqtt
-
130
-
131#endif // ASYNC_MQTT_PREDEFINED_LAYER_CUSTOMIZED_SSL_STREAM_HPP
+
109
+
110} // namespace async_mqtt
+
111
+
112#endif // ASYNC_MQTT_PREDEFINED_LAYER_CUSTOMIZED_SSL_STREAM_HPP
sys::error_code error_code
sys is a namespace alias of boost::sytem.
Definition error.hpp:56
@ info
info level api call is output
customization class template for underlying layer In order to adapt your layer to async_mqtt,...
Definition stream_traits.hpp:101
diff --git a/doc/api/ioc__queue_8hpp_source.html b/doc/api/ioc__queue_8hpp_source.html index f57f4816c..0f9dbea65 100644 --- a/doc/api/ioc__queue_8hpp_source.html +++ b/doc/api/ioc__queue_8hpp_source.html @@ -142,37 +142,40 @@
40 queue_,
41 std::forward<CompletionToken>(token)
42 );
-
43 if (immediate_executable()) {
-
44 queue_.restart();
-
45 queue_.poll_one();
-
46 }
-
47 }
-
48
-
49 bool stopped() const {
-
50 return queue_.stopped();
-
51 }
-
52
-
53 std::size_t poll_one() {
-
54 working_ = false;
-
55 if (queue_.stopped()) queue_.restart();
-
56 return queue_.poll_one();
-
57 }
-
58
-
59 std::size_t poll() {
-
60 working_ = false;
-
61 if (queue_.stopped()) queue_.restart();
-
62 return queue_.poll();
-
63 }
-
64
-
65private:
-
66 as::io_context queue_{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};
-
67 bool working_ = false;
-
68 std::optional<as::executor_work_guard<as::io_context::executor_type>> guard_;
-
69};
-
70
-
71} // namespace async_mqtt
-
72
-
73#endif // ASYNC_MQTT_UTIL_IOC_QUEUE_HPP
+
43 }
+
44
+
45 void try_execute() {
+
46 if (immediate_executable()) {
+
47 queue_.restart();
+
48 queue_.poll_one();
+
49 }
+
50 }
+
51
+
52 bool stopped() const {
+
53 return queue_.stopped();
+
54 }
+
55
+
56 std::size_t poll_one() {
+
57 working_ = false;
+
58 if (queue_.stopped()) queue_.restart();
+
59 return queue_.poll_one();
+
60 }
+
61
+
62 std::size_t poll() {
+
63 working_ = false;
+
64 if (queue_.stopped()) queue_.restart();
+
65 return queue_.poll();
+
66 }
+
67
+
68private:
+
69 as::io_context queue_{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};
+
70 bool working_ = false;
+
71 std::optional<as::executor_work_guard<as::io_context::executor_type>> guard_;
+
72};
+
73
+
74} // namespace async_mqtt
+
75
+
76#endif // ASYNC_MQTT_UTIL_IOC_QUEUE_HPP