Skip to content

Commit

Permalink
Merge pull request #357 from redboltz/fix_tls_close
Browse files Browse the repository at this point in the history
Fix tls close
  • Loading branch information
redboltz authored Oct 8, 2024
2 parents 5b029eb + 58696ff commit d0966f1
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 135 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
= History

== 9.0.2
* Fixed TLS timeout logic. #357
* Fixed broker auth file for docker. #356

== 9.0.1
Expand Down
3 changes: 3 additions & 0 deletions doc/CHANGELOG.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ <h2 id="_9_0_2">9.0.2</h2>
<div class="ulist">
<ul>
<li>
<p>Fixed TLS timeout logic. #357</p>
</li>
<li>
<p>Fixed broker auth file for docker. #356</p>
</li>
</ul>
Expand Down
101 changes: 41 additions & 60 deletions doc/api/customized__ssl__stream_8hpp_source.html

Large diffs are not rendered by default.

65 changes: 34 additions & 31 deletions doc/api/ioc__queue_8hpp_source.html
Original file line number Diff line number Diff line change
Expand Up @@ -142,37 +142,40 @@
<div class="line"><a id="l00040" name="l00040"></a><span class="lineno"> 40</span> queue_,</div>
<div class="line"><a id="l00041" name="l00041"></a><span class="lineno"> 41</span> std::forward&lt;CompletionToken&gt;(token)</div>
<div class="line"><a id="l00042" name="l00042"></a><span class="lineno"> 42</span> );</div>
<div class="line"><a id="l00043" name="l00043"></a><span class="lineno"> 43</span> <span class="keywordflow">if</span> (immediate_executable()) {</div>
<div class="line"><a id="l00044" name="l00044"></a><span class="lineno"> 44</span> queue_.restart();</div>
<div class="line"><a id="l00045" name="l00045"></a><span class="lineno"> 45</span> queue_.poll_one();</div>
<div class="line"><a id="l00046" name="l00046"></a><span class="lineno"> 46</span> }</div>
<div class="line"><a id="l00047" name="l00047"></a><span class="lineno"> 47</span> }</div>
<div class="line"><a id="l00048" name="l00048"></a><span class="lineno"> 48</span> </div>
<div class="line"><a id="l00049" name="l00049"></a><span class="lineno"> 49</span> <span class="keywordtype">bool</span> stopped()<span class="keyword"> const </span>{</div>
<div class="line"><a id="l00050" name="l00050"></a><span class="lineno"> 50</span> <span class="keywordflow">return</span> queue_.stopped();</div>
<div class="line"><a id="l00051" name="l00051"></a><span class="lineno"> 51</span> }</div>
<div class="line"><a id="l00052" name="l00052"></a><span class="lineno"> 52</span> </div>
<div class="line"><a id="l00053" name="l00053"></a><span class="lineno"> 53</span> std::size_t poll_one() {</div>
<div class="line"><a id="l00054" name="l00054"></a><span class="lineno"> 54</span> working_ = <span class="keyword">false</span>;</div>
<div class="line"><a id="l00055" name="l00055"></a><span class="lineno"> 55</span> <span class="keywordflow">if</span> (queue_.stopped()) queue_.restart();</div>
<div class="line"><a id="l00056" name="l00056"></a><span class="lineno"> 56</span> <span class="keywordflow">return</span> queue_.poll_one();</div>
<div class="line"><a id="l00057" name="l00057"></a><span class="lineno"> 57</span> }</div>
<div class="line"><a id="l00058" name="l00058"></a><span class="lineno"> 58</span> </div>
<div class="line"><a id="l00059" name="l00059"></a><span class="lineno"> 59</span> std::size_t poll() {</div>
<div class="line"><a id="l00060" name="l00060"></a><span class="lineno"> 60</span> working_ = <span class="keyword">false</span>;</div>
<div class="line"><a id="l00061" name="l00061"></a><span class="lineno"> 61</span> <span class="keywordflow">if</span> (queue_.stopped()) queue_.restart();</div>
<div class="line"><a id="l00062" name="l00062"></a><span class="lineno"> 62</span> <span class="keywordflow">return</span> queue_.poll();</div>
<div class="line"><a id="l00063" name="l00063"></a><span class="lineno"> 63</span> }</div>
<div class="line"><a id="l00064" name="l00064"></a><span class="lineno"> 64</span> </div>
<div class="line"><a id="l00065" name="l00065"></a><span class="lineno"> 65</span><span class="keyword">private</span>:</div>
<div class="line"><a id="l00066" name="l00066"></a><span class="lineno"> 66</span> as::io_context queue_{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};</div>
<div class="line"><a id="l00067" name="l00067"></a><span class="lineno"> 67</span> <span class="keywordtype">bool</span> working_ = <span class="keyword">false</span>;</div>
<div class="line"><a id="l00068" name="l00068"></a><span class="lineno"> 68</span> std::optional&lt;as::executor_work_guard&lt;as::io_context::executor_type&gt;&gt; guard_;</div>
<div class="line"><a id="l00069" name="l00069"></a><span class="lineno"> 69</span>};</div>
<div class="line"><a id="l00070" name="l00070"></a><span class="lineno"> 70</span> </div>
<div class="line"><a id="l00071" name="l00071"></a><span class="lineno"> 71</span>} <span class="comment">// namespace async_mqtt</span></div>
<div class="line"><a id="l00072" name="l00072"></a><span class="lineno"> 72</span> </div>
<div class="line"><a id="l00073" name="l00073"></a><span class="lineno"> 73</span><span class="preprocessor">#endif </span><span class="comment">// ASYNC_MQTT_UTIL_IOC_QUEUE_HPP</span></div>
<div class="line"><a id="l00043" name="l00043"></a><span class="lineno"> 43</span> }</div>
<div class="line"><a id="l00044" name="l00044"></a><span class="lineno"> 44</span> </div>
<div class="line"><a id="l00045" name="l00045"></a><span class="lineno"> 45</span> <span class="keywordtype">void</span> try_execute() {</div>
<div class="line"><a id="l00046" name="l00046"></a><span class="lineno"> 46</span> <span class="keywordflow">if</span> (immediate_executable()) {</div>
<div class="line"><a id="l00047" name="l00047"></a><span class="lineno"> 47</span> queue_.restart();</div>
<div class="line"><a id="l00048" name="l00048"></a><span class="lineno"> 48</span> queue_.poll_one();</div>
<div class="line"><a id="l00049" name="l00049"></a><span class="lineno"> 49</span> }</div>
<div class="line"><a id="l00050" name="l00050"></a><span class="lineno"> 50</span> }</div>
<div class="line"><a id="l00051" name="l00051"></a><span class="lineno"> 51</span> </div>
<div class="line"><a id="l00052" name="l00052"></a><span class="lineno"> 52</span> <span class="keywordtype">bool</span> stopped()<span class="keyword"> const </span>{</div>
<div class="line"><a id="l00053" name="l00053"></a><span class="lineno"> 53</span> <span class="keywordflow">return</span> queue_.stopped();</div>
<div class="line"><a id="l00054" name="l00054"></a><span class="lineno"> 54</span> }</div>
<div class="line"><a id="l00055" name="l00055"></a><span class="lineno"> 55</span> </div>
<div class="line"><a id="l00056" name="l00056"></a><span class="lineno"> 56</span> std::size_t poll_one() {</div>
<div class="line"><a id="l00057" name="l00057"></a><span class="lineno"> 57</span> working_ = <span class="keyword">false</span>;</div>
<div class="line"><a id="l00058" name="l00058"></a><span class="lineno"> 58</span> <span class="keywordflow">if</span> (queue_.stopped()) queue_.restart();</div>
<div class="line"><a id="l00059" name="l00059"></a><span class="lineno"> 59</span> <span class="keywordflow">return</span> queue_.poll_one();</div>
<div class="line"><a id="l00060" name="l00060"></a><span class="lineno"> 60</span> }</div>
<div class="line"><a id="l00061" name="l00061"></a><span class="lineno"> 61</span> </div>
<div class="line"><a id="l00062" name="l00062"></a><span class="lineno"> 62</span> std::size_t poll() {</div>
<div class="line"><a id="l00063" name="l00063"></a><span class="lineno"> 63</span> working_ = <span class="keyword">false</span>;</div>
<div class="line"><a id="l00064" name="l00064"></a><span class="lineno"> 64</span> <span class="keywordflow">if</span> (queue_.stopped()) queue_.restart();</div>
<div class="line"><a id="l00065" name="l00065"></a><span class="lineno"> 65</span> <span class="keywordflow">return</span> queue_.poll();</div>
<div class="line"><a id="l00066" name="l00066"></a><span class="lineno"> 66</span> }</div>
<div class="line"><a id="l00067" name="l00067"></a><span class="lineno"> 67</span> </div>
<div class="line"><a id="l00068" name="l00068"></a><span class="lineno"> 68</span><span class="keyword">private</span>:</div>
<div class="line"><a id="l00069" name="l00069"></a><span class="lineno"> 69</span> as::io_context queue_{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};</div>
<div class="line"><a id="l00070" name="l00070"></a><span class="lineno"> 70</span> <span class="keywordtype">bool</span> working_ = <span class="keyword">false</span>;</div>
<div class="line"><a id="l00071" name="l00071"></a><span class="lineno"> 71</span> std::optional&lt;as::executor_work_guard&lt;as::io_context::executor_type&gt;&gt; guard_;</div>
<div class="line"><a id="l00072" name="l00072"></a><span class="lineno"> 72</span>};</div>
<div class="line"><a id="l00073" name="l00073"></a><span class="lineno"> 73</span> </div>
<div class="line"><a id="l00074" name="l00074"></a><span class="lineno"> 74</span>} <span class="comment">// namespace async_mqtt</span></div>
<div class="line"><a id="l00075" name="l00075"></a><span class="lineno"> 75</span> </div>
<div class="line"><a id="l00076" name="l00076"></a><span class="lineno"> 76</span><span class="preprocessor">#endif </span><span class="comment">// ASYNC_MQTT_UTIL_IOC_QUEUE_HPP</span></div>
</div><!-- fragment --></div><!-- contents -->
</div><!-- doc-content -->
<!-- start footer part -->
Expand Down
2 changes: 1 addition & 1 deletion include/async_mqtt/impl/endpoint_close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ close_op {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &a_ep)
<< "already close requested";
a_ep.close_queue_.post(
a_ep.close_queue_.post(
force_move(self)
);
} break;
Expand Down
67 changes: 24 additions & 43 deletions include/async_mqtt/predefined_layer/customized_ssl_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,60 +68,41 @@ struct layer_customize<as::ssl::stream<NextLayer>> {
stream.get_executor(),
shutdown_timeout
);
auto self_sp = std::make_shared<Self>(force_move(self));
auto sig = std::make_shared<as::cancellation_signal>();
tim->async_wait(
as::consign(
as::append(
std::ref(*self_sp),
std::weak_ptr<as::steady_timer>(tim)
),
self_sp
)
[sig, wp = std::weak_ptr<as::steady_timer>(tim)]
(error_code const& ec) {
if (!ec) {
if (auto sp = wp.lock()) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown timeout";
sig->emit(as::cancellation_type::terminal);
}
}
}
);
stream.async_shutdown(
as::consign(
std::ref(*self_sp),
self_sp,
tim
auto& a_stream{stream};
a_stream.async_shutdown(
as::bind_cancellation_slot(
sig->slot(),
as::consign(
force_move(self),
tim,
sig
)
)
);
}

template <typename Self>
void operator()(
Self& self,
error_code const& ec,
std::weak_ptr<as::steady_timer> wp
) {
if (!ec) {
if (auto sp = wp.lock()) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown timeout";
BOOST_ASSERT(state == shutdown);
state = complete;
self.complete(ec);
return;
}
}
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown timeout doesn't processed. ec:" << ec.message();
}

template <typename Self>
void operator()(
Self& self,
error_code const& ec
) {
if (state == complete) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown already timeout";
}
else {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown ec:" << ec.message();
state = complete;
self.complete(ec);
}
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown ec:" << ec.message();
state = complete;
self.complete(ec);
}
};
};
Expand Down
1 change: 1 addition & 0 deletions include/async_mqtt/util/impl/stream_read_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct stream_impl<NextLayer>::stream_read_packet_op {
a_strm.read_queue_.post(
force_move(self)
);
a_strm.read_queue_.try_execute();
} break;
case work: {
a_strm.read_queue_.start_work();
Expand Down
1 change: 1 addition & 0 deletions include/async_mqtt/util/impl/stream_write_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct stream_impl<NextLayer>::stream_write_packet_op {
a_strm.write_queue_.post(
force_move(self)
);
a_strm.write_queue_.try_execute();
} break;
case write: {
a_strm.write_queue_.start_work();
Expand Down
3 changes: 3 additions & 0 deletions include/async_mqtt/util/ioc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class ioc_queue {
queue_,
std::forward<CompletionToken>(token)
);
}

void try_execute() {
if (immediate_executable()) {
queue_.restart();
queue_.poll_one();
Expand Down

0 comments on commit d0966f1

Please sign in to comment.