From 8a03c5512591a90116b7c4e26758da582f7bee0e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 11 Jun 2021 13:26:35 -0300 Subject: [PATCH] Use a different mechanism to avoid timers being scheduled multiple times by the MultiThreadedExecutor Signed-off-by: Ivan Santiago Paunovic --- rclcpp/CMakeLists.txt | 1 - .../rclcpp/detail/mutex_two_priorities.hpp | 76 ------------- .../executors/multi_threaded_executor.hpp | 5 +- .../strategies/allocator_memory_strategy.hpp | 7 +- rclcpp/include/rclcpp/timer.hpp | 30 ++++- .../rclcpp/detail/mutex_two_priorities.cpp | 104 ------------------ .../executors/multi_threaded_executor.cpp | 27 +---- .../node_interfaces/test_node_timers.cpp | 1 + 8 files changed, 35 insertions(+), 216 deletions(-) delete mode 100644 rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp delete mode 100644 rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 95dc5311d1..c1cb8bf3e2 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -41,7 +41,6 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/clock.cpp src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp - src/rclcpp/detail/mutex_two_priorities.cpp src/rclcpp/detail/resolve_parameter_overrides.cpp src/rclcpp/detail/rmw_implementation_specific_payload.cpp src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp diff --git a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp b/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp deleted file mode 100644 index 98118b619f..0000000000 --- a/rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2021 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_ -#define RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_ - -#include -#include - -namespace rclcpp -{ -namespace detail -{ -/// \internal A mutex that has two locking mechanism, one with higher priority than the other. -/** - * After the current mutex owner release the lock, a thread that used the high - * priority mechanism will have priority over threads that used the low priority mechanism. - */ -class MutexTwoPriorities -{ -public: - class HighPriorityLockable - { -public: - explicit HighPriorityLockable(MutexTwoPriorities & parent); - - void lock(); - - void unlock(); - -private: - MutexTwoPriorities & parent_; - }; - - class LowPriorityLockable - { -public: - explicit LowPriorityLockable(MutexTwoPriorities & parent); - - void lock(); - - void unlock(); - -private: - MutexTwoPriorities & parent_; - }; - - HighPriorityLockable - get_high_priority_lockable(); - - LowPriorityLockable - get_low_priority_lockable(); - -private: - std::condition_variable hp_cv_; - std::condition_variable lp_cv_; - std::mutex cv_mutex_; - size_t hp_waiting_count_{0u}; - bool data_taken_{false}; -}; - -} // namespace detail -} // namespace rclcpp - -#endif // RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_ diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 28504620c5..a1befba300 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -22,7 +22,6 @@ #include #include -#include "rclcpp/detail/mutex_two_priorities.hpp" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -82,12 +81,10 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) - detail::MutexTwoPriorities wait_mutex_; + std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; - - std::set scheduled_timers_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 76790b1245..43ef71682e 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -388,6 +388,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy ++it; continue; } + if (!timer->call()) { + // timer was cancelled, skip it. + ++it; + continue; + } // Otherwise it is safe to set and return the any_exec any_exec.timer = timer; any_exec.callback_group = group; @@ -395,7 +400,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy timer_handles_.erase(it); return; } - // Else, the service is no longer valid, remove it and continue + // Else, the timer is no longer valid, remove it and continue it = timer_handles_.erase(it); } } diff --git a/rclcpp/include/rclcpp/timer.hpp b/rclcpp/include/rclcpp/timer.hpp index cc8e5c6ccd..52df423d83 100644 --- a/rclcpp/include/rclcpp/timer.hpp +++ b/rclcpp/include/rclcpp/timer.hpp @@ -91,6 +91,17 @@ class TimerBase void reset(); + /// Indicate that we're about to execute the callback. + /** + * The multithreaded executor takes advantage of this to avoid scheduling + * the callback multiple times. + * + * \return `true` if the callback should be executed, `false` if the timer was canceled. + */ + RCLCPP_PUBLIC + virtual bool + call() = 0; + /// Call the callback function when the timer signal is emitted. RCLCPP_PUBLIC virtual void @@ -192,19 +203,28 @@ class GenericTimer : public TimerBase } /** - * \sa rclcpp::TimerBase::execute_callback - * \throws std::runtime_error if it failed to notify timer that callback occurred + * \sa rclcpp::TimerBase::call + * \throws std::runtime_error if it failed to notify timer that callback will occurr */ - void - execute_callback() override + bool + call() override { rcl_ret_t ret = rcl_timer_call(timer_handle_.get()); if (ret == RCL_RET_TIMER_CANCELED) { - return; + return false; } if (ret != RCL_RET_OK) { throw std::runtime_error("Failed to notify timer that callback occurred"); } + return true; + } + + /** + * \sa rclcpp::TimerBase::execute_callback + */ + void + execute_callback() override + { TRACEPOINT(callback_start, static_cast(&callback_), false); execute_callback_delegate<>(); TRACEPOINT(callback_end, static_cast(&callback_)); diff --git a/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp b/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp deleted file mode 100644 index 8deb864f5f..0000000000 --- a/rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2021 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/detail/mutex_two_priorities.hpp" - -#include - -namespace rclcpp -{ -namespace detail -{ - -using LowPriorityLockable = MutexTwoPriorities::LowPriorityLockable; -using HighPriorityLockable = MutexTwoPriorities::HighPriorityLockable; - -HighPriorityLockable::HighPriorityLockable(MutexTwoPriorities & parent) -: parent_(parent) -{} - -void -HighPriorityLockable::lock() -{ - std::unique_lock guard{parent_.cv_mutex_}; - if (parent_.data_taken_) { - ++parent_.hp_waiting_count_; - while (parent_.data_taken_) { - parent_.hp_cv_.wait(guard); - } - --parent_.hp_waiting_count_; - } - parent_.data_taken_ = true; -} - -void -HighPriorityLockable::unlock() -{ - bool notify_lp{false}; - { - std::lock_guard guard{parent_.cv_mutex_}; - parent_.data_taken_ = false; - notify_lp = 0u == parent_.hp_waiting_count_; - } - if (notify_lp) { - parent_.lp_cv_.notify_one(); - } else { - parent_.hp_cv_.notify_one(); - } -} - -LowPriorityLockable::LowPriorityLockable(MutexTwoPriorities & parent) -: parent_(parent) -{} - -void -LowPriorityLockable::lock() -{ - std::unique_lock guard{parent_.cv_mutex_}; - while (parent_.data_taken_ || parent_.hp_waiting_count_) { - parent_.lp_cv_.wait(guard); - } - parent_.data_taken_ = true; -} - -void -LowPriorityLockable::unlock() -{ - bool notify_lp{false}; - { - std::lock_guard guard{parent_.cv_mutex_}; - parent_.data_taken_ = false; - notify_lp = 0u == parent_.hp_waiting_count_; - } - if (notify_lp) { - parent_.lp_cv_.notify_one(); - } else { - parent_.hp_cv_.notify_one(); - } -} - -HighPriorityLockable -MutexTwoPriorities::get_high_priority_lockable() -{ - return HighPriorityLockable{*this}; -} - -LowPriorityLockable -MutexTwoPriorities::get_low_priority_lockable() -{ - return LowPriorityLockable{*this}; -} - -} // namespace detail -} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index a9b01b0c34..73a655160f 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -22,7 +22,6 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/scope_exit.hpp" -using rclcpp::detail::MutexTwoPriorities; using rclcpp::executors::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor( @@ -52,8 +51,7 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; { - auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable(); - std::lock_guard wait_lock(low_priority_wait_mutex); + std::lock_guard wait_lock{wait_mutex_}; for (; thread_id < number_of_threads_ - 1; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); threads.emplace_back(func); @@ -78,26 +76,13 @@ MultiThreadedExecutor::run(size_t) while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { - auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable(); - std::lock_guard wait_lock(low_priority_wait_mutex); + std::lock_guard wait_lock{wait_mutex_}; if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } if (!get_next_executable(any_exec, next_exec_timeout_)) { continue; } - if (any_exec.timer) { - // Guard against multiple threads getting the same timer. - if (scheduled_timers_.count(any_exec.timer) != 0) { - // Make sure that any_exec's callback group is reset before - // the lock is released. - if (any_exec.callback_group) { - any_exec.callback_group->can_be_taken_from().store(true); - } - continue; - } - scheduled_timers_.insert(any_exec.timer); - } } if (yield_before_execute_) { std::this_thread::yield(); @@ -105,14 +90,6 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); - if (any_exec.timer) { - auto high_priority_wait_mutex = wait_mutex_.get_high_priority_lockable(); - std::lock_guard wait_lock(high_priority_wait_mutex); - auto it = scheduled_timers_.find(any_exec.timer); - if (it != scheduled_timers_.end()) { - scheduled_timers_.erase(it); - } - } // Clear the callback_group to prevent the AnyExecutable destructor from // resetting the callback group `can_be_taken_from` any_exec.callback_group.reset(); diff --git a/rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp b/rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp index e206e75b6c..d368e0e875 100644 --- a/rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp +++ b/rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp @@ -32,6 +32,7 @@ class TestTimer : public rclcpp::TimerBase : TimerBase(node->get_clock(), std::chrono::nanoseconds(1), node->get_node_base_interface()->get_context()) {} + bool call() override {return true;} void execute_callback() override {} bool is_steady() override {return false;} };