Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a different mechanism to avoid timers being scheduled multiple times by the MultiThreadedExecutor #1692

Merged
merged 1 commit into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/mutex_two_priorities.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down
76 changes: 0 additions & 76 deletions rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <thread>
#include <unordered_map>

#include "rclcpp/detail/mutex_two_priorities.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand Down Expand Up @@ -82,12 +81,10 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

detail::MutexTwoPriorities wait_mutex_;
std::mutex wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

std::set<TimerBase::SharedPtr> scheduled_timers_;
};

} // namespace executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,19 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
++it;
continue;
}
if (!timer->call()) {
// timer was cancelled, skip it.
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec.timer = timer;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
timer_handles_.erase(it);
return;
}
// Else, the service is no longer valid, remove it and continue
// Else, the timer is no longer valid, remove it and continue
it = timer_handles_.erase(it);
}
}
Expand Down
30 changes: 25 additions & 5 deletions rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ class TimerBase
void
reset();

/// Indicate that we're about to execute the callback.
/**
* The multithreaded executor takes advantage of this to avoid scheduling
* the callback multiple times.
*
* \return `true` if the callback should be executed, `false` if the timer was canceled.
*/
RCLCPP_PUBLIC
virtual bool
call() = 0;

/// Call the callback function when the timer signal is emitted.
RCLCPP_PUBLIC
virtual void
Expand Down Expand Up @@ -192,19 +203,28 @@ class GenericTimer : public TimerBase
}

/**
* \sa rclcpp::TimerBase::execute_callback
* \throws std::runtime_error if it failed to notify timer that callback occurred
* \sa rclcpp::TimerBase::call
* \throws std::runtime_error if it failed to notify timer that callback will occurr
*/
void
execute_callback() override
bool
call() override
{
rcl_ret_t ret = rcl_timer_call(timer_handle_.get());
if (ret == RCL_RET_TIMER_CANCELED) {
return;
return false;
}
if (ret != RCL_RET_OK) {
throw std::runtime_error("Failed to notify timer that callback occurred");
}
return true;
}

/**
* \sa rclcpp::TimerBase::execute_callback
*/
void
execute_callback() override
{
TRACEPOINT(callback_start, static_cast<const void *>(&callback_), false);
execute_callback_delegate<>();
TRACEPOINT(callback_end, static_cast<const void *>(&callback_));
Expand Down
104 changes: 0 additions & 104 deletions rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp

This file was deleted.

27 changes: 2 additions & 25 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/scope_exit.hpp"

using rclcpp::detail::MutexTwoPriorities;
using rclcpp::executors::MultiThreadedExecutor;

MultiThreadedExecutor::MultiThreadedExecutor(
Expand Down Expand Up @@ -52,8 +51,7 @@ MultiThreadedExecutor::spin()
std::vector<std::thread> threads;
size_t thread_id = 0;
{
auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable();
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
Expand All @@ -78,41 +76,20 @@ MultiThreadedExecutor::run(size_t)
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable();
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_exec.timer);
}
}
if (yield_before_execute_) {
std::this_thread::yield();
}

execute_any_executable(any_exec);

if (any_exec.timer) {
auto high_priority_wait_mutex = wait_mutex_.get_high_priority_lockable();
std::lock_guard<MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
}
}
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
Expand Down
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TestTimer : public rclcpp::TimerBase
: TimerBase(node->get_clock(), std::chrono::nanoseconds(1),
node->get_node_base_interface()->get_context()) {}

bool call() override {return true;}
void execute_callback() override {}
bool is_steady() override {return false;}
};
Expand Down