Skip to content

Commit 632ffd7

Browse files
committed
Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301)
* Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov <[email protected]> * Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov <[email protected]> * Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov <[email protected]> * Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov <[email protected]> * Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]> (cherry picked from commit 46a23e9) # Conflicts: # rosbag2_py/src/rosbag2_py/_transport.cpp # rosbag2_py/test/test_transport.py
1 parent e4604c5 commit 632ffd7

File tree

2 files changed

+70
-11
lines changed

2 files changed

+70
-11
lines changed

Diff for: rosbag2_py/src/rosbag2_py/_transport.cpp

+58-11
Original file line numberDiff line numberDiff line change
@@ -141,17 +141,20 @@ class Player
141141

142142
class Recorder
143143
{
144-
private:
145-
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;
146-
147144
public:
148145
Recorder()
149146
{
150-
rclcpp::init(0, nullptr);
151-
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
147+
auto init_options = rclcpp::InitOptions();
148+
init_options.shutdown_on_signal = false;
149+
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);
150+
152151
std::signal(
153152
SIGTERM, [](int /* signal */) {
154-
rclcpp::shutdown();
153+
rosbag2_py::Recorder::cancel();
154+
});
155+
std::signal(
156+
SIGINT, [](int /* signal */) {
157+
rosbag2_py::Recorder::cancel();
155158
});
156159
}
157160

@@ -164,6 +167,8 @@ class Recorder
164167
const rosbag2_storage::StorageOptions & storage_options,
165168
RecordOptions & record_options)
166169
{
170+
exit_ = false;
171+
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
167172
if (record_options.rmw_serialization_format.empty()) {
168173
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
169174
}
@@ -173,20 +178,55 @@ class Recorder
173178
std::move(writer), storage_options, record_options);
174179
recorder->record();
175180

176-
exec_->add_node(recorder);
177-
// Release the GIL for long-running record, so that calling Python code can use other threads
181+
exec->add_node(recorder);
182+
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
183+
// recorder->stop() to be able to send notifications about bag split and close.
184+
auto spin_thread = std::thread(
185+
[&exec]() {
186+
exec->spin();
187+
});
178188
{
189+
// Release the GIL for long-running record, so that calling Python code can use other threads
179190
py::gil_scoped_release release;
180-
exec_->spin();
191+
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
192+
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
193+
recorder->stop();
194+
}
195+
exec->cancel();
196+
if (spin_thread.joinable()) {
197+
spin_thread.join();
181198
}
199+
exec->remove_node(recorder);
182200
}
183201

184-
void cancel()
202+
static void cancel()
185203
{
186-
exec_->cancel();
204+
exit_ = true;
205+
wait_for_exit_cv_.notify_all();
187206
}
207+
208+
protected:
209+
static std::atomic_bool exit_;
210+
static std::condition_variable wait_for_exit_cv_;
211+
std::mutex wait_for_exit_mutex_;
188212
};
189213

214+
<<<<<<< HEAD
215+
=======
216+
std::atomic_bool Recorder::exit_{false};
217+
std::condition_variable Recorder::wait_for_exit_cv_{};
218+
219+
// Return a RecordOptions struct with defaults set for rewriting bags.
220+
rosbag2_transport::RecordOptions bag_rewrite_default_record_options()
221+
{
222+
rosbag2_transport::RecordOptions options{};
223+
// We never want to drop messages when converting bags, so set the compression queue size to 0
224+
// (unbounded).
225+
options.compression_queue_size = 0;
226+
return options;
227+
}
228+
229+
>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301))
190230
// Simple wrapper to read the output config YAML into structs
191231
void bag_rewrite(
192232
const std::vector<rosbag2_storage::StorageOptions> & input_options,
@@ -283,8 +323,15 @@ PYBIND11_MODULE(_transport, m) {
283323

284324
py::class_<rosbag2_py::Recorder>(m, "Recorder")
285325
.def(py::init())
326+
<<<<<<< HEAD
286327
.def("record", &rosbag2_py::Recorder::record)
287328
.def("cancel", &rosbag2_py::Recorder::cancel)
329+
=======
330+
.def(
331+
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"),
332+
py::arg("node_name") = "rosbag2_recorder")
333+
.def_static("cancel", &rosbag2_py::Recorder::cancel)
334+
>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301))
288335
;
289336

290337
m.def(

Diff for: rosbag2_py/test/test_transport.py

+12
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,19 @@ def test_record_cancel(tmp_path):
8181

8282
recorder.cancel()
8383

84+
<<<<<<< HEAD
8485
metadata_path = Path(bag_path) / 'metadata.yaml'
8586
db3_path = Path(bag_path) / 'test_record_cancel_0.db3'
8687
assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(),
88+
=======
89+
metadata_io = rosbag2_py.MetadataIo()
90+
assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path),
91+
timeout=rclpy.duration.Duration(seconds=3))
92+
record_thread.join()
93+
94+
metadata = metadata_io.read_metadata(bag_path)
95+
assert(len(metadata.relative_file_paths))
96+
storage_path = Path(metadata.relative_file_paths[0])
97+
assert wait_for(lambda: storage_path.is_file(),
98+
>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301))
8799
timeout=rclpy.duration.Duration(seconds=3))

0 commit comments

Comments
 (0)