Skip to content

Commit

Permalink
[humble] Fix for possible freeze in Recorder::stop() (backport #1381) (
Browse files Browse the repository at this point in the history
…#1388)

* Fix for possible freeze in Recorder::stop() (#1381)

* Fix for possible freeze in Recorder::stop()

- It was possible a freeze in recorder due to the race condition when
calling Recorder::stop() while event publisher thread hasn't been fully
started yet.

Signed-off-by: Michael Orlov <[email protected]>

* Move event_publisher_thread_wake_cv_.notify_all() out of the mutex lock

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit c6cc69a)

# Conflicts:
#	rosbag2_transport/src/rosbag2_transport/recorder.cpp

* Resolve merge conflicts

Signed-off-by: Michael Orlov <[email protected]>

* Remove `in_recording_` variable for ABI compatibility

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
mergify[bot] and MichaelOrlov authored Jun 28, 2023
1 parent 07fd903 commit 6ceed91
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
5 changes: 3 additions & 2 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class Recorder : public rclcpp::Node
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
std::atomic<bool> paused_ = false;

// Keyboard handler
std::shared_ptr<KeyboardHandler> keyboard_handler_;
// Toogle paused key callback handle
Expand All @@ -172,8 +173,8 @@ class Recorder : public rclcpp::Node

// Variables for event publishing
rclcpp::Publisher<rosbag2_interfaces::msg::WriteSplitEvent>::SharedPtr split_event_pub_;
bool event_publisher_thread_should_exit_ = false;
bool write_split_has_occurred_ = false;
std::atomic<bool> event_publisher_thread_should_exit_ = false;
std::atomic<bool> write_split_has_occurred_ = false;
rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_;
std::mutex event_publisher_thread_mutex_;
std::condition_variable event_publisher_thread_wake_cv_;
Expand Down
18 changes: 10 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ void Recorder::stop()
{
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval);
if (status != std::future_status::ready) {
RCLCPP_ERROR_STREAM(
get_logger(),
"discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() <<
") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred"));
}
}
paused_ = true;
subscriptions_.clear();
Expand All @@ -135,6 +141,7 @@ void Recorder::stop()
if (event_publisher_thread_.joinable()) {
event_publisher_thread_.join();
}
RCLCPP_INFO(get_logger(), "Recording stopped");
}

void Recorder::record()
Expand Down Expand Up @@ -190,15 +197,13 @@ void Recorder::record()
discovery_future_ =
std::async(std::launch::async, std::bind(&Recorder::topics_discovery, this));
}
RCLCPP_INFO(get_logger(), "Recording...");
}

void Recorder::event_publisher_thread_main()
{
RCLCPP_INFO(get_logger(), "Event publisher thread: Starting");

bool should_exit = false;

while (!should_exit) {
while (!event_publisher_thread_should_exit_.load()) {
std::unique_lock<std::mutex> lock(event_publisher_thread_mutex_);
event_publisher_thread_wake_cv_.wait(
lock,
Expand All @@ -222,10 +227,7 @@ void Recorder::event_publisher_thread_main()
"Failed to publish message on '/events/write_split' topic.");
}
}

should_exit = event_publisher_thread_should_exit_;
}

RCLCPP_INFO(get_logger(), "Event publisher thread: Exiting");
}

Expand Down

0 comments on commit 6ceed91

Please sign in to comment.