From e97cf9e01980645db853a4160a1613eddf870f16 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 3 Jul 2023 23:36:29 -0700 Subject: [PATCH] [humble] Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (backport #1301) (#1395) * Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301) * Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov * Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov * Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov * Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov * Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov (cherry picked from commit 46a23e9c0a6f4ab4e41b2b2d8471b0a25ab1be79) # Conflicts: # rosbag2_py/src/rosbag2_py/_transport.cpp # rosbag2_py/test/test_transport.py * Address merge conflicts Signed-off-by: Michael Orlov * Workaround for segmentation fault in rclcpp::SignalHandler::uninstall() Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov Co-authored-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 56 ++++++++++++++++++------ rosbag2_py/test/test_transport.py | 1 + 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index bbfa21b73a..52ef47c28c 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -141,22 +141,27 @@ class Player class Recorder { -private: - std::unique_ptr exec_; - public: Recorder() { - rclcpp::init(0, nullptr); - exec_ = std::make_unique(); + auto init_options = rclcpp::InitOptions(); + init_options.shutdown_on_signal = false; + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::SigInt); + rclcpp::uninstall_signal_handlers(); + std::signal( SIGTERM, [](int /* signal */) { - rclcpp::shutdown(); + rosbag2_py::Recorder::cancel(); + }); + std::signal( + SIGINT, [](int /* signal */) { + rosbag2_py::Recorder::cancel(); }); } virtual ~Recorder() { +// rclcpp::install_signal_handlers(rclcpp::SignalHandlerOptions::SigTerm); rclcpp::shutdown(); } @@ -164,6 +169,8 @@ class Recorder const rosbag2_storage::StorageOptions & storage_options, RecordOptions & record_options) { + exit_ = false; + auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); } @@ -173,20 +180,42 @@ class Recorder std::move(writer), storage_options, record_options); recorder->record(); - exec_->add_node(recorder); - // Release the GIL for long-running record, so that calling Python code can use other threads + exec->add_node(recorder); + // Run exec->spin() in a separate thread, because we need to call exec->cancel() after + // recorder->stop() to be able to send notifications about bag split and close. + auto spin_thread = std::thread( + [&exec]() { + exec->spin(); + }); { + // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; - exec_->spin(); + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + recorder->stop(); + } + exec->cancel(); + if (spin_thread.joinable()) { + spin_thread.join(); } + exec->remove_node(recorder); } - void cancel() + static void cancel() { - exec_->cancel(); + exit_ = true; + wait_for_exit_cv_.notify_all(); } + +protected: + static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + std::mutex wait_for_exit_mutex_; }; +std::atomic_bool Recorder::exit_{false}; +std::condition_variable Recorder::wait_for_exit_cv_{}; + // Simple wrapper to read the output config YAML into structs void bag_rewrite( const std::vector & input_options, @@ -283,8 +312,9 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Recorder") .def(py::init()) - .def("record", &rosbag2_py::Recorder::record) - .def("cancel", &rosbag2_py::Recorder::cancel) + .def( + "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options")) + .def_static("cancel", &rosbag2_py::Recorder::cancel) ; m.def( diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index 176d7e9fe2..d5d3f01a2d 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -85,3 +85,4 @@ def test_record_cancel(tmp_path): db3_path = Path(bag_path) / 'test_record_cancel_0.db3' assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(), timeout=rclpy.duration.Duration(seconds=3)) + record_thread.join()