Skip to content

Commit b874b49

Browse files
morlov-apexaimergify[bot]
authored andcommitted
Fix for possible freeze in Recorder::stop() (#1381)
* Fix for possible freeze in Recorder::stop() - It was possible a freeze in recorder due to the race condition when calling Recorder::stop() while event publisher thread hasn't been fully started yet. Signed-off-by: Michael Orlov <[email protected]> * Move event_publisher_thread_wake_cv_.notify_all() out of the mutex lock Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]> (cherry picked from commit c6cc69a)
1 parent 70525f2 commit b874b49

File tree

1 file changed

+23
-15
lines changed

1 file changed

+23
-15
lines changed

Diff for: rosbag2_transport/src/rosbag2_transport/recorder.cpp

+23-15
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class RecorderImpl
8282
std::shared_ptr<rosbag2_cpp::Writer> writer_;
8383
rosbag2_storage::StorageOptions storage_options_;
8484
rosbag2_transport::RecordOptions record_options_;
85-
std::atomic<bool> stop_discovery_;
85+
std::atomic<bool> stop_discovery_ = false;
8686
std::unordered_map<std::string, std::shared_ptr<rclcpp::SubscriptionBase>> subscriptions_;
8787

8888
private:
@@ -132,14 +132,15 @@ class RecorderImpl
132132
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_;
133133

134134
std::atomic<bool> paused_ = false;
135+
std::atomic<bool> in_recording_ = false;
135136
std::shared_ptr<KeyboardHandler> keyboard_handler_;
136137
KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ =
137138
KeyboardHandler::invalid_handle;
138139

139140
// Variables for event publishing
140141
rclcpp::Publisher<rosbag2_interfaces::msg::WriteSplitEvent>::SharedPtr split_event_pub_;
141-
bool event_publisher_thread_should_exit_ = false;
142-
bool write_split_has_occurred_ = false;
142+
std::atomic<bool> event_publisher_thread_should_exit_ = false;
143+
std::atomic<bool> write_split_has_occurred_ = false;
143144
rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_;
144145
std::mutex event_publisher_thread_mutex_;
145146
std::condition_variable event_publisher_thread_wake_cv_;
@@ -185,12 +186,17 @@ RecorderImpl::~RecorderImpl()
185186
stop();
186187
}
187188

188-
189189
void RecorderImpl::stop()
190190
{
191191
stop_discovery_ = true;
192192
if (discovery_future_.valid()) {
193-
discovery_future_.wait();
193+
auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval);
194+
if (status != std::future_status::ready) {
195+
RCLCPP_ERROR_STREAM(
196+
node->get_logger(),
197+
"discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() <<
198+
") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred"));
199+
}
194200
}
195201
paused_ = true;
196202
subscriptions_.clear();
@@ -204,10 +210,18 @@ void RecorderImpl::stop()
204210
if (event_publisher_thread_.joinable()) {
205211
event_publisher_thread_.join();
206212
}
213+
in_recording_ = false;
214+
RCLCPP_INFO(node->get_logger(), "Recording stopped");
207215
}
208216

209217
void RecorderImpl::record()
210218
{
219+
if (in_recording_.exchange(true)) {
220+
RCLCPP_WARN_STREAM(
221+
node->get_logger(),
222+
"Called Recorder::record() while already in recording, dismissing request.");
223+
return;
224+
}
211225
paused_ = record_options_.start_paused;
212226
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
213227
if (record_options_.rmw_serialization_format.empty()) {
@@ -274,9 +288,8 @@ void RecorderImpl::record()
274288
// Start the thread that will publish events
275289
event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this);
276290

277-
split_event_pub_ = node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>(
278-
"events/write_split",
279-
1);
291+
split_event_pub_ =
292+
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);
280293
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
281294
callbacks.write_split_callback =
282295
[this](rosbag2_cpp::bag_events::BagSplitInfo & info) {
@@ -297,15 +310,13 @@ void RecorderImpl::record()
297310
discovery_future_ =
298311
std::async(std::launch::async, std::bind(&RecorderImpl::topics_discovery, this));
299312
}
313+
RCLCPP_INFO(node->get_logger(), "Recording...");
300314
}
301315

302316
void RecorderImpl::event_publisher_thread_main()
303317
{
304318
RCLCPP_INFO(node->get_logger(), "Event publisher thread: Starting");
305-
306-
bool should_exit = false;
307-
308-
while (!should_exit) {
319+
while (!event_publisher_thread_should_exit_.load()) {
309320
std::unique_lock<std::mutex> lock(event_publisher_thread_mutex_);
310321
event_publisher_thread_wake_cv_.wait(
311322
lock,
@@ -319,10 +330,7 @@ void RecorderImpl::event_publisher_thread_main()
319330
message.opened_file = bag_split_info_.opened_file;
320331
split_event_pub_->publish(message);
321332
}
322-
323-
should_exit = event_publisher_thread_should_exit_;
324333
}
325-
326334
RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting");
327335
}
328336

0 commit comments

Comments
 (0)