From 8b43291b2f22cf31fb6cd7af83a5459e4a191ea7 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 1 Jun 2024 13:53:41 -0700 Subject: [PATCH 1/7] Add options to disable signal handling in Player and Recorder Python API - Also added test coverage for signals handling in Python API Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 95 +++++++---- rosbag2_py/test/test_transport.py | 194 +++++++++++++++++++++-- 2 files changed, 249 insertions(+), 40 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9d15e82b1..0cbba5fe3 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -184,7 +184,15 @@ class Player const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) { - play_impl(storage_options, play_options, false); + play_impl(storage_options, play_options, false, 0, true); + } + + void play_with_signal_option( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + bool enable_signal_handling) + { + play_impl(storage_options, play_options, false, 0, enable_signal_handling); } void burst( @@ -192,7 +200,16 @@ class Player PlayOptions & play_options, size_t num_messages) { - play_impl(storage_options, play_options, true, num_messages); + play_impl(storage_options, play_options, true, num_messages, true); + } + + void burst_with_signal_option( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + size_t num_messages, + bool enable_signal_handling) + { + play_impl(storage_options, play_options, true, num_messages, enable_signal_handling); } protected: @@ -215,13 +232,10 @@ class Player { if (old_sigterm_handler_ != SIG_ERR) { std::signal(SIGTERM, old_sigterm_handler_); - old_sigterm_handler_ = SIG_ERR; } if (old_sigint_handler_ != SIG_ERR) { std::signal(SIGINT, old_sigint_handler_); - old_sigint_handler_ = SIG_ERR; } - deferred_sig_number_ = -1; } static void process_deferred_signal() @@ -243,9 +257,12 @@ class Player const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options, bool burst = false, - size_t burst_num_messages = 0) + size_t burst_num_messages = 0, + bool enable_signal_handling = true) { - install_signal_handlers(); + if (enable_signal_handling) { + install_signal_handlers(); + } try { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); std::shared_ptr keyboard_handler; @@ -297,25 +314,30 @@ class Player } exec.remove_node(player); } catch (...) { - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } throw; } - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } } static std::atomic_bool exit_; + static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; static SignalHandlerType old_sigint_handler_; static SignalHandlerType old_sigterm_handler_; - static int deferred_sig_number_; + static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR}; Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR}; -int Player::deferred_sig_number_{-1}; +volatile std::sig_atomic_t Player::deferred_sig_number_{-1}; std::atomic_bool Player::exit_{false}; std::condition_variable Player::wait_for_exit_cv_{}; @@ -339,7 +361,18 @@ class Recorder RecordOptions & record_options, std::string & node_name) { - install_signal_handlers(); + record_with_signal_option(storage_options, record_options, node_name, true); + } + + void record_with_signal_option( + const rosbag2_storage::StorageOptions & storage_options, + RecordOptions & record_options, + std::string & node_name, + bool enable_signal_handling) + { + if (enable_signal_handling) { + install_signal_handlers(); + } try { exit_ = false; auto exec = std::make_unique(); @@ -385,12 +418,16 @@ class Recorder } exec->remove_node(recorder); } catch (...) { - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } throw; } - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } } static void cancel() @@ -419,13 +456,10 @@ class Recorder { if (old_sigterm_handler_ != SIG_ERR) { std::signal(SIGTERM, old_sigterm_handler_); - old_sigterm_handler_ = SIG_ERR; } if (old_sigint_handler_ != SIG_ERR) { std::signal(SIGINT, old_sigint_handler_); - old_sigint_handler_ = SIG_ERR; } - deferred_sig_number_ = -1; } static void process_deferred_signal() @@ -444,16 +478,17 @@ class Recorder } static std::atomic_bool exit_; + static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; static SignalHandlerType old_sigint_handler_; static SignalHandlerType old_sigterm_handler_; - static int deferred_sig_number_; + static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR}; Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR}; -int Recorder::deferred_sig_number_{-1}; +volatile std::sig_atomic_t Recorder::deferred_sig_number_{-1}; std::atomic_bool Recorder::exit_{false}; std::condition_variable Recorder::wait_for_exit_cv_{}; @@ -592,18 +627,22 @@ PYBIND11_MODULE(_transport, m) { .def(py::init<>()) .def(py::init()) .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) - .def( - "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), + .def("play", &rosbag2_py::Player::play_with_signal_option, py::arg("storage_options"), + py::arg("play_options"), py::arg("enable_signal_handling")) + .def("burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), py::arg("num_messages")) + .def("burst", &rosbag2_py::Player::burst_with_signal_option, py::arg("storage_options"), + py::arg("play_options"), py::arg("num_messages"), py::arg("enable_signal_handling")) .def_static("cancel", &rosbag2_py::Player::cancel) ; py::class_(m, "Recorder") .def(py::init<>()) .def(py::init()) - .def( - "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"), - py::arg("node_name") = "rosbag2_recorder") + .def("record", &rosbag2_py::Recorder::record, py::arg("storage_options"), + py::arg("record_options"), py::arg("node_name") = "rosbag2_recorder") + .def("record", &rosbag2_py::Recorder::record_with_signal_option, py::arg("storage_options"), + py::arg("record_options"), py::arg("node_name"), py::arg("enable_signal_handling")) .def_static("cancel", &rosbag2_py::Recorder::cancel) ; diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index dabc96b1f..0700c3ad4 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -15,6 +15,7 @@ import datetime from pathlib import Path import re +import signal import threading from common import get_rosbag_options, wait_for @@ -30,6 +31,23 @@ RESOURCES_PATH = Path(__file__).parent / 'resources' PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING = r'\[rosbag2_player]: Playback until timestamp: -1' +RECORDING_STARTED_REGEX_STRING = r'\[rosbag2_recorder]: Recording...' + + +def check_record_start_output(cumulative_err, capfd): + out, err = capfd.readouterr() + cumulative_err += err + expected_string_regex = re.compile(RECORDING_STARTED_REGEX_STRING) + matches = expected_string_regex.search(cumulative_err) + return matches is not None + + +def check_playback_start_output(cumulative_err, capfd): + out, err = capfd.readouterr() + cumulative_err += err + expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING) + matches = expected_string_regex.search(cumulative_err) + return matches is not None def test_options_qos_conversion(): @@ -67,6 +85,166 @@ def test_recoder_log_level(): rosbag2_py.Recorder(invalid_log_level) +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_process_sigint_in_recorder(tmp_path, storage_id, capfd): + bag_path = tmp_path / 'test_process_sigint_in_recorder' + storage_options, converter_options = get_rosbag_options(str(bag_path), storage_id) + + recorder = rosbag2_py.Recorder() + + record_options = rosbag2_py.RecordOptions() + record_options.all_topics = True + + record_thread = threading.Thread( + target=recorder.record, + args=(storage_options, record_options), + daemon=True) + record_thread.start() + + captured_err = '' + assert wait_for(lambda: check_record_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + # Expected to call deferred signal handler no later than exit from the record_thread + record_thread.join() + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + if record_thread.is_alive(): + record_thread.join(3) + assert not record_thread.is_alive() + + metadata_io = rosbag2_py.MetadataIo() + assert wait_for(lambda: metadata_io.metadata_file_exists(str(bag_path)), + timeout=rclpy.duration.Duration(seconds=3)) + metadata = metadata_io.read_metadata(str(bag_path)) + assert len(metadata.relative_file_paths) + storage_path = bag_path / metadata.relative_file_paths[0] + assert wait_for(lambda: storage_path.is_file(), + timeout=rclpy.duration.Duration(seconds=3)) + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_record_process_sigint_in_python_handler(tmp_path, storage_id, capfd): + bag_path = tmp_path / 'test_record_sigint_in_python' + storage_options, converter_options = get_rosbag_options(str(bag_path), storage_id) + + recorder = rosbag2_py.Recorder() + + record_options = rosbag2_py.RecordOptions() + record_options.all_topics = True + + record_thread = threading.Thread( + target=recorder.record, + args=(storage_options, record_options, 'rosbag2_recorder', False), + daemon=True) + record_thread.start() + + captured_err = '' + assert wait_for(lambda: check_record_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + # The recorder hasn't been stopped by signal need to call cancel() method to stop it. + recorder.cancel() + if record_thread.is_alive(): + record_thread.join(3) + assert not record_thread.is_alive() + + metadata_io = rosbag2_py.MetadataIo() + assert wait_for(lambda: metadata_io.metadata_file_exists(str(bag_path)), + timeout=rclpy.duration.Duration(seconds=3)) + metadata = metadata_io.read_metadata(str(bag_path)) + assert len(metadata.relative_file_paths) + storage_path = bag_path / metadata.relative_file_paths[0] + assert wait_for(lambda: storage_path.is_file(), + timeout=rclpy.duration.Duration(seconds=3)) + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_process_sigint_in_player(storage_id, capfd): + bag_path = str(RESOURCES_PATH / storage_id / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path, storage_id) + + player = rosbag2_py.Player() + + play_options = rosbag2_py.PlayOptions() + play_options.loop = True + play_options.start_paused = True + + player_thread = threading.Thread( + target=player.play, + args=(storage_options, play_options), + daemon=True) + player_thread.start() + + captured_err = '' + assert wait_for(lambda: check_playback_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + # Expected to call deferred signal handler no later than exit from the player_thread + player_thread.join() + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + if player_thread.is_alive(): + player_thread.join(3) + assert not player_thread.is_alive() + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_play_process_sigint_in_python_handler(storage_id, capfd): + bag_path = str(RESOURCES_PATH / storage_id / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path, storage_id) + + player = rosbag2_py.Player() + + play_options = rosbag2_py.PlayOptions() + play_options.loop = True + play_options.start_paused = True + + player_thread = threading.Thread( + target=player.play, + args=(storage_options, play_options, False), + daemon=True) + player_thread.start() + + captured_err = '' + assert wait_for(lambda: check_playback_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + # The player hasn't been stopped by signal need to call cancel() method to stop it. + player.cancel() + if player_thread.is_alive(): + player_thread.join(3) + assert not player_thread.is_alive() + + @pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) def test_record_cancel(tmp_path, storage_id): bag_path = tmp_path / 'test_record_cancel' @@ -77,7 +255,7 @@ def test_record_cancel(tmp_path, storage_id): record_options = rosbag2_py.RecordOptions() record_options.all_topics = True record_options.is_discovery_disabled = False - record_options.topic_polling_interval = datetime.timedelta(milliseconds=100) + record_options.topic_polling_interval = datetime.timedelta(milliseconds=10) ctx = rclpy.Context() ctx.init() @@ -95,7 +273,8 @@ def test_record_cancel(tmp_path, storage_id): i = 0 msg = String() - while rclpy.ok() and i < 10: + assert rclpy.ok(context=ctx) + while rclpy.ok(context=ctx) and i < 10: msg.data = 'Hello World: {0}'.format(i) i += 1 pub.publish(msg) @@ -131,17 +310,8 @@ def test_play_cancel(storage_id, capfd): daemon=True) player_thread.start() - def check_playback_start_output(cumulative_out, cumulative_err): - out, err = capfd.readouterr() - cumulative_err += err - cumulative_out += out - expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING) - matches = expected_string_regex.search(cumulative_err) - return matches is not None - - captured_out = '' captured_err = '' - assert wait_for(lambda: check_playback_start_output(captured_out, captured_err), + assert wait_for(lambda: check_playback_start_output(captured_err, capfd), timeout=rclpy.duration.Duration(seconds=3)) player.cancel() From b43820e4004d892ffb5f7ae57106fe0e15fdc746 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 12 Jun 2024 00:03:08 -0700 Subject: [PATCH 2/7] Address old uncrustify style warnings Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 0cbba5fe3..176e491aa 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -627,21 +627,26 @@ PYBIND11_MODULE(_transport, m) { .def(py::init<>()) .def(py::init()) .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) - .def("play", &rosbag2_py::Player::play_with_signal_option, py::arg("storage_options"), - py::arg("play_options"), py::arg("enable_signal_handling")) - .def("burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), + .def( + "play", &rosbag2_py::Player::play_with_signal_option, py::arg("storage_options"), + py::arg("play_options"), py::arg("enable_signal_handling")) + .def( + "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), py::arg("num_messages")) - .def("burst", &rosbag2_py::Player::burst_with_signal_option, py::arg("storage_options"), - py::arg("play_options"), py::arg("num_messages"), py::arg("enable_signal_handling")) + .def( + "burst", &rosbag2_py::Player::burst_with_signal_option, py::arg("storage_options"), + py::arg("play_options"), py::arg("num_messages"), py::arg("enable_signal_handling")) .def_static("cancel", &rosbag2_py::Player::cancel) ; py::class_(m, "Recorder") .def(py::init<>()) .def(py::init()) - .def("record", &rosbag2_py::Recorder::record, py::arg("storage_options"), + .def( + "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"), py::arg("node_name") = "rosbag2_recorder") - .def("record", &rosbag2_py::Recorder::record_with_signal_option, py::arg("storage_options"), + .def( + "record", &rosbag2_py::Recorder::record_with_signal_option, py::arg("storage_options"), py::arg("record_options"), py::arg("node_name"), py::arg("enable_signal_handling")) .def_static("cancel", &rosbag2_py::Recorder::cancel) ; From f6267645b01dd93b6e9bd958627ab0f2b660280a Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 17 Aug 2024 11:41:18 -0700 Subject: [PATCH 3/7] Move common signal handling functionality to a separate class - Add DeferredSignalHandler class - Note: Windows support will be added in a follow-up PR Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 244 ++++++++++++----------- 1 file changed, 130 insertions(+), 114 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 176e491aa..e8b78e97b 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -153,6 +153,98 @@ struct OptionsWrapper : public T typedef OptionsWrapper PlayOptions; typedef OptionsWrapper RecordOptions; +/// \brief Defers signals handling until class destruction or by an explicit call to the +/// process_deferred_signals(). +/// Can install an external signal handler, which will be called upon original signals arriving. +/// \note The install_signal_handlers(external_signal_handler) method shall be called to start +/// capturing deferred signals. +class DeferredSignalHandler +{ +public: + using SignalHandlerType = void (*)(int); + using SignalType = int; + + explicit DeferredSignalHandler(const std::initializer_list & signals) + { + if (sigemptyset(&DeferredSignalHandler::deferred_signals_set_) != 0) { + throw std::runtime_error(std::string("Error calling sigemptyset: ") + std::strerror(errno)); + } + for (auto signal : signals) { + signals_.push_back(signal); + } + } + + ~DeferredSignalHandler() noexcept + { + process_deferred_signals(); + uninstall_signal_handlers(); + } + + void install_signal_handlers(const SignalHandlerType & external_signal_handler) + { + external_signal_handler_ = external_signal_handler; + for (auto sig_num : signals_) { + old_signal_handlers_.emplace_back( + sig_num, std::signal(sig_num, &DeferredSignalHandler::on_signal)); + } + } + + void uninstall_signal_handlers() + { + external_signal_handler_ = SIG_ERR; + // Restore old signal handlers + for (auto [sig_num, old_signal_handler] : old_signal_handlers_) { + if (old_signal_handler != SIG_ERR) { + std::signal(sig_num, old_signal_handler); + old_signal_handler = SIG_ERR; + } + } + old_signal_handlers_.clear(); + } + + void process_deferred_signals() + { + auto call_signal_handler = [](const SignalHandlerType & signal_handler, SignalType signal) { + if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { + signal_handler(signal); + } + }; + + for (auto [deferred_sig_num, old_signal_handler] : old_signal_handlers_) { + auto ret = sigismember(&deferred_signals_set_, deferred_sig_num); + if (ret == -1) { + throw std::runtime_error("Error. Invalid signal: " + std::to_string(deferred_sig_num)); + } else if (ret == 1 && old_signal_handler != SIG_ERR) { + call_signal_handler(old_signal_handler, deferred_sig_num); + sigdelset(&deferred_signals_set_, deferred_sig_num); + } + } + } + +protected: + static void on_signal(SignalType sig_num) + { + if (external_signal_handler_ != SIG_ERR) { + external_signal_handler_(sig_num); + } + if (sigaddset(&DeferredSignalHandler::deferred_signals_set_, sig_num) != 0) { + char error_msg[128]; + snprintf(error_msg, sizeof(error_msg), "Failed to add deferred signal = %d\n", sig_num); + // write is signal-safe + auto written = write(STDERR_FILENO, error_msg, sizeof(error_msg) - 1); + (void)written; + } + } + + static sigset_t deferred_signals_set_; + static SignalHandlerType external_signal_handler_; + std::vector> old_signal_handlers_; + std::vector signals_; +}; + +sigset_t DeferredSignalHandler::deferred_signals_set_{}; +DeferredSignalHandler::SignalHandlerType DeferredSignalHandler::external_signal_handler_{SIG_ERR}; + } // namespace namespace rosbag2_py @@ -161,8 +253,6 @@ namespace rosbag2_py class Player { public: - using SignalHandlerType = void (*)(int); - explicit Player(const std::string & log_level = "info") { Arguments arguments({"--ros-args", "--log-level", log_level}); @@ -213,46 +303,6 @@ class Player } protected: - static void signal_handler(int sig_num) - { - if (sig_num == SIGINT || sig_num == SIGTERM) { - deferred_sig_number_ = sig_num; - rosbag2_py::Player::cancel(); - } - } - - static void install_signal_handlers() - { - deferred_sig_number_ = -1; - old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler); - old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler); - } - - static void uninstall_signal_handlers() - { - if (old_sigterm_handler_ != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler_); - } - if (old_sigint_handler_ != SIG_ERR) { - std::signal(SIGINT, old_sigint_handler_); - } - } - - static void process_deferred_signal() - { - auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { - if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { - signal_handler(sig_num); - } - }; - - if (deferred_sig_number_ == SIGINT) { - call_signal_handler(old_sigint_handler_, deferred_sig_number_); - } else if (deferred_sig_number_ == SIGTERM) { - call_signal_handler(old_sigterm_handler_, deferred_sig_number_); - } - } - void play_impl( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options, @@ -260,8 +310,14 @@ class Player size_t burst_num_messages = 0, bool enable_signal_handling = true) { + exit_.store(false); + DeferredSignalHandler deferred_signal_handler({SIGINT, SIGTERM}); if (enable_signal_handling) { - install_signal_handlers(); + deferred_signal_handler.install_signal_handlers( + [](int /*signum*/) { + // Note: Using condition_variable from signal handler is not safe and shall not be used. + rosbag2_py::Player::exit_.store(true); + }); } try { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); @@ -289,8 +345,13 @@ class Player auto wait_for_exit_thread = std::thread( [&]() { - std::unique_lock lock(wait_for_exit_mutex_); - wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();}); + while (!exit_.load()) { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait_for( + lock, + std::chrono::milliseconds(30), + [] {return rosbag2_py::Player::exit_.load();}); + } player->stop(); }); { @@ -315,36 +376,29 @@ class Player exec.remove_node(player); } catch (...) { if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); + // Process deferred signals before re-throwing exception, because we may never return to + // the deferred_signal_handler destructor after exception. + deferred_signal_handler.process_deferred_signals(); + deferred_signal_handler.uninstall_signal_handlers(); } throw; } - if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); - } } static std::atomic_bool exit_; static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; - static SignalHandlerType old_sigint_handler_; - static SignalHandlerType old_sigterm_handler_; - static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; -Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR}; -Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR}; -volatile std::sig_atomic_t Player::deferred_sig_number_{-1}; std::atomic_bool Player::exit_{false}; +// exit_ shall be lock free to be able to use it from signal handler +static_assert(std::atomic_bool::is_always_lock_free); std::condition_variable Player::wait_for_exit_cv_{}; class Recorder { public: - using SignalHandlerType = void (*)(int); explicit Recorder(const std::string & log_level = "info") { Arguments arguments({"--ros-args", "--log-level", log_level}); @@ -370,11 +424,16 @@ class Recorder std::string & node_name, bool enable_signal_handling) { + exit_.store(false); + DeferredSignalHandler deferred_signal_handler({SIGINT, SIGTERM}); if (enable_signal_handling) { - install_signal_handlers(); + deferred_signal_handler.install_signal_handlers( + [](int /*signum*/) { + // Note: Using condition_variable from signal handler is not safe and shall not be used. + rosbag2_py::Recorder::exit_.store(true); + }); } try { - exit_ = false; auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); @@ -408,8 +467,13 @@ class Recorder // Release the GIL for long-running record, so that calling Python code // can use other threads py::gil_scoped_release release; - std::unique_lock lock(wait_for_exit_mutex_); - wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + while (!exit_.load()) { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait_for( + lock, + std::chrono::milliseconds(30), + [] {return rosbag2_py::Recorder::exit_.load();}); + } recorder->stop(); } exec->cancel(); @@ -419,15 +483,13 @@ class Recorder exec->remove_node(recorder); } catch (...) { if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); + // Process deferred signals before re-throwing exception, because we can never return to + // the deferred_signal_handler destructor after exception. + deferred_signal_handler.process_deferred_signals(); + deferred_signal_handler.uninstall_signal_handlers(); } throw; } - if (enable_signal_handling) { - uninstall_signal_handlers(); - process_deferred_signal(); - } } static void cancel() @@ -437,58 +499,12 @@ class Recorder } protected: - static void signal_handler(int sig_num) - { - if (sig_num == SIGINT || sig_num == SIGTERM) { - deferred_sig_number_ = sig_num; - rosbag2_py::Recorder::cancel(); - } - } - - static void install_signal_handlers() - { - deferred_sig_number_ = -1; - old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler); - old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler); - } - - static void uninstall_signal_handlers() - { - if (old_sigterm_handler_ != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler_); - } - if (old_sigint_handler_ != SIG_ERR) { - std::signal(SIGINT, old_sigint_handler_); - } - } - - static void process_deferred_signal() - { - auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { - if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { - signal_handler(sig_num); - } - }; - - if (deferred_sig_number_ == SIGINT) { - call_signal_handler(old_sigint_handler_, deferred_sig_number_); - } else if (deferred_sig_number_ == SIGTERM) { - call_signal_handler(old_sigterm_handler_, deferred_sig_number_); - } - } - static std::atomic_bool exit_; static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; - static SignalHandlerType old_sigint_handler_; - static SignalHandlerType old_sigterm_handler_; - static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; -Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR}; -Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR}; -volatile std::sig_atomic_t Recorder::deferred_sig_number_{-1}; std::atomic_bool Recorder::exit_{false}; std::condition_variable Recorder::wait_for_exit_cv_{}; From 299ed19dcec8ff27bff00871cd0f99cec1fa1cf9 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 17 Aug 2024 11:53:45 -0700 Subject: [PATCH 4/7] Fixup for uncrustify Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index e8b78e97b..893b3c6bb 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -317,7 +317,7 @@ class Player [](int /*signum*/) { // Note: Using condition_variable from signal handler is not safe and shall not be used. rosbag2_py::Player::exit_.store(true); - }); + }); } try { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); @@ -431,7 +431,7 @@ class Recorder [](int /*signum*/) { // Note: Using condition_variable from signal handler is not safe and shall not be used. rosbag2_py::Recorder::exit_.store(true); - }); + }); } try { auto exec = std::make_unique(); From 6d94fda943101baf334df7e94f37c2dc07e33cd8 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 17 Aug 2024 11:44:38 -0700 Subject: [PATCH 5/7] Regenerate Python stub files (.pyi) Signed-off-by: Michael Orlov --- rosbag2_py/rosbag2_py/_transport.pyi | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index fdd8f0780..6189f3ce2 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -36,10 +36,16 @@ class Player: def __init__(self) -> None: ... @overload def __init__(self, arg0: str) -> None: ... + @overload def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ... + @overload + def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int, enable_signal_handling: bool) -> None: ... @staticmethod def cancel() -> None: ... + @overload def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ... + @overload + def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, enable_signal_handling: bool) -> None: ... class RecordOptions: all_services: bool @@ -77,7 +83,10 @@ class Recorder: def __init__(self, arg0: str) -> None: ... @staticmethod def cancel() -> None: ... + @overload def record(self, storage_options: rosbag2_py._storage.StorageOptions, record_options: RecordOptions, node_name: str = ...) -> None: ... + @overload + def record(self, storage_options: rosbag2_py._storage.StorageOptions, record_options: RecordOptions, node_name: str, enable_signal_handling: bool) -> None: ... class ServiceRequestsSource: __members__: ClassVar[dict] = ... # read-only From 01d5d65f17c37ec7f0ae6ba05b662c23987de64c Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 4 Sep 2024 15:01:26 -0700 Subject: [PATCH 6/7] Address Windows build failure - Add missing macros to work with signal set Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 893b3c6bb..9a970fac3 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -16,6 +16,7 @@ #include #include #include +#include // For _fileno on Windows #include #include #include @@ -153,6 +154,28 @@ struct OptionsWrapper : public T typedef OptionsWrapper PlayOptions; typedef OptionsWrapper RecordOptions; +#ifdef _WIN32 +#ifndef sigset_t +typedef uint32_t sigset_t; +#endif +#ifndef sigaddset +#define sigaddset(what, sig) (*(what) |= (1 << (sig)), 0) +#endif +#ifndef sigdelset +#define sigdelset(what, sig) (*(what) &= ~(1 << (sig)), 0) +#endif +#ifndef sigemptyset +#define sigemptyset(what) (*(what) = 0, 0) +#endif +#ifndef sigismember +#define sigismember(what, sig) (((*(what)) & (1 << (sig))) != 0) +#endif + +#ifndef STDERR_FILENO +#define STDERR_FILENO _fileno(stderr) +#endif +#endif // #ifdef _WIN32 + /// \brief Defers signals handling until class destruction or by an explicit call to the /// process_deferred_signals(). /// Can install an external signal handler, which will be called upon original signals arriving. From 12bdf3d4fce044b3437526c0852ee85c72888313 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 4 Sep 2024 18:05:54 -0700 Subject: [PATCH 7/7] Disable tests that check signal handling on Windows - Rationale: The signal handling on Windows hasn't been implemented yet. Signed-off-by: Michael Orlov --- rosbag2_py/test/test_transport.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index 0700c3ad4..bf6681d4c 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -16,6 +16,7 @@ from pathlib import Path import re import signal +import sys import threading from common import get_rosbag_options, wait_for @@ -87,6 +88,9 @@ def test_recoder_log_level(): @pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) def test_process_sigint_in_recorder(tmp_path, storage_id, capfd): + if sys.platform.startswith('win'): + pytest.skip('Signal handling is not implemented on Windows yet.') + bag_path = tmp_path / 'test_process_sigint_in_recorder' storage_options, converter_options = get_rosbag_options(str(bag_path), storage_id) @@ -131,6 +135,9 @@ def test_process_sigint_in_recorder(tmp_path, storage_id, capfd): @pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) def test_record_process_sigint_in_python_handler(tmp_path, storage_id, capfd): + if sys.platform.startswith('win'): + pytest.skip('Signal handling is not implemented on Windows yet.') + bag_path = tmp_path / 'test_record_sigint_in_python' storage_options, converter_options = get_rosbag_options(str(bag_path), storage_id) @@ -175,6 +182,9 @@ def test_record_process_sigint_in_python_handler(tmp_path, storage_id, capfd): @pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) def test_process_sigint_in_player(storage_id, capfd): + if sys.platform.startswith('win'): + pytest.skip('Signal handling is not implemented on Windows yet.') + bag_path = str(RESOURCES_PATH / storage_id / 'talker') storage_options, converter_options = get_rosbag_options(bag_path, storage_id) @@ -211,6 +221,9 @@ def test_process_sigint_in_player(storage_id, capfd): @pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) def test_play_process_sigint_in_python_handler(storage_id, capfd): + if sys.platform.startswith('win'): + pytest.skip('Signal handling is not implemented on Windows yet.') + bag_path = str(RESOURCES_PATH / storage_id / 'talker') storage_options, converter_options = get_rosbag_options(bag_path, storage_id)