Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Thread configuration prototype #2205

Open
wants to merge 5 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
#include <chrono>
#include <memory>
#include <mutex>
#include <vector>
#include <set>
#include <thread>
#include <unordered_map>

#include "rcutils/thread_attr.h"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand All @@ -39,7 +40,7 @@ class MultiThreadedExecutor : public rclcpp::Executor

/// Constructor for MultiThreadedExecutor.
/**
* For the yield_before_execute option, when true std::this_thread::yield()
* For the yield_before_execute option, when true rcpputils::this_thread::yield()
* will be called after acquiring work (as an AnyExecutable) and
* releasing the spinning lock, but before executing the work.
* This is useful for reproducing some bugs related to taking work more than
Expand All @@ -48,7 +49,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
* \param options common options for all executors
* \param number_of_threads number of threads to have in the thread pool,
* the default 0 will use the number of cpu cores found (minimum of 2)
* \param yield_before_execute if true std::this_thread::yield() is called
* \param yield_before_execute if true rcpputils::this_thread::yield() is called
* \param timeout maximum time to wait
*/
RCLCPP_PUBLIC
Expand Down Expand Up @@ -85,6 +86,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
rcutils_thread_attrs_t * thread_attributes_;
};

} // namespace executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <vector>

#include "rcutils/thread_attr.h"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand Down Expand Up @@ -65,8 +66,14 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

protected:
RCLCPP_PUBLIC
void
run();

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
rcutils_thread_attrs_t * thread_attributes_;
};

} // namespace executors
Expand Down
62 changes: 51 additions & 11 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>

#include "rcpputils/scope_exit.hpp"
#include "rcpputils/threads.hpp"

#include "rclcpp/logging.hpp"
#include "rclcpp/utilities.hpp"
Expand All @@ -33,13 +34,30 @@ MultiThreadedExecutor::MultiThreadedExecutor(
std::chrono::nanoseconds next_exec_timeout)
: rclcpp::Executor(options),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout)
next_exec_timeout_(next_exec_timeout),
thread_attributes_(nullptr)
{
bool has_number_of_threads_arg = number_of_threads > 0;

number_of_threads_ = number_of_threads > 0 ?
number_of_threads :
std::max(std::thread::hardware_concurrency(), 2U);
std::max(rcpputils::Thread::hardware_concurrency(), 2U);

if (number_of_threads_ == 1) {
if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs(
options.context->get_rcl_context().get()))
{
thread_attributes_ = attrs;
}

if (has_number_of_threads_arg && thread_attributes_ &&
thread_attributes_->num_attributes != number_of_threads)
{
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"The number of threads argument passed to the MultiThreadedExecutor"
" is different from the number of thread attributes.\n"
"The executor runs using the thread attributes and ignores the former.");
} else if (number_of_threads_ == 1) {
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"MultiThreadedExecutor is used with a single thread.\n"
Expand All @@ -56,17 +74,35 @@ MultiThreadedExecutor::spin()
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
std::vector<std::thread> threads;
std::vector<rcpputils::Thread> threads;
size_t thread_id = 0;
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);

if (thread_attributes_) {
rcpputils::Thread::Attribute thread_attr;
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) {
thread_attr.set_thread_attribute(
thread_attributes_->attributes[thread_id]);
Copy link

@JanStaschulat JanStaschulat Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Multi-Threaded Executor has multiple threads as a thread-pool to process the callbacks of new incoming messages (timers, services, etc.) in parallel. However, the Executor does not differentiate between different messages or timers. At run-time, a callback could be processed by any thread. There is no mapping of e.g. important callbacks to high priority threads possible. So, in my opinion, it would only makes sense to configure all threads with the same configuration (e.g. priority, core-affinity, ...). If at all.

What is your intention to assign different thread parameters (e.g. priority, core-affinity) to different threads?
Could you give an example how you intend to use the Multi-Threaded Executor with different thread priorities in your use-case?

Copy link

@JanStaschulat JanStaschulat Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your great effort to add thread configuration to ROS2. This is very important piece of work regarding real-time support. I think we need a proper way to control the thread configuration of the thread, which is processing a callback. I am not so sure, that assigning different thread-priorities to to threads of the Multi-Threaded Executor solves the problem. It is also necessary to define a mapping of callbacks (of subscriptions, timers, services, etc.) to a particular thread configuration (e.g. priority).

Alternative approaches:

  • create a thread outside ROS 2 and then let the Single-Threaded Executor run inside it.

  • the user specifies the priority per subscription. Then the Executor changes the thread priority just before the callback is called (this concept works for the Single-Threaded as well as Multi-Threaded Executor. This concept will be used in an experimental real-time executor at the "Real-time programming with ROS 2" Workshop at ROSCon. The source code will be made available.

  • extend Events-Executor to control real-time behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other approaches are to create a thread outside ROS 2 and then let e.g. the single threaded Executor run.

I agree with this.
We should not have executors modify the attributes of the thread that they are running into. If users want to run a single-threaded executor in a thread with specific attributes, they can easily do it themselves, rather than having the executor change things under their feet.

A different scenario is whenever the rclcpp library internally creates threads (e.g. the events-executor, the clock subscriptions, the graph updates etc) all these can take advantage of the new tools to configure threads (rather than just blindly inheriting the parent thread configurations).

Besides that, I'm not convinced by the use of

rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs(options.context->get_rcl_context().get()

We shouldn't restrict to a single thread attribute per ROS context and the two concepts shouldn't be tied together.
I would recommend to have all classes that deal with threads to take a rcutils_thread_attrs_t as input in the constructor

auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(rcpputils::Thread(thread_attr, func));
}
}
thread_attr.set_thread_attribute(
thread_attributes_->attributes[thread_id]);
rcpputils::this_thread::run_with_thread_attribute(
thread_attr, &MultiThreadedExecutor::run, this, thread_id);
} else {
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
}
}
run(thread_id);
}

run(thread_id);
for (auto & thread : threads) {
thread.join();
}
Expand All @@ -75,7 +111,11 @@ MultiThreadedExecutor::spin()
size_t
MultiThreadedExecutor::get_number_of_threads()
{
return number_of_threads_;
if (thread_attributes_) {
return thread_attributes_->num_attributes;
} else {
return number_of_threads_;
}
}

void
Expand Down
32 changes: 31 additions & 1 deletion rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,49 @@
// limitations under the License.

#include "rcpputils/scope_exit.hpp"
#include "rcpputils/threads.hpp"

#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/any_executable.hpp"

using rclcpp::executors::SingleThreadedExecutor;

SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options)
: rclcpp::Executor(options) {}
: rclcpp::Executor(options),
thread_attributes_(nullptr)
{
if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs(
options.context->get_rcl_context().get()))
{
thread_attributes_ = attrs;
}

if (thread_attributes_ && thread_attributes_->num_attributes != 1) {
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Specified thread attributes contains multiple configurations.\n"
"The executor runs only using first configuration.");
}
}

SingleThreadedExecutor::~SingleThreadedExecutor() {}

void
SingleThreadedExecutor::spin()
{
if (thread_attributes_) {
rcpputils::Thread::Attribute thread_attr;
thread_attr.set_thread_attribute(
thread_attributes_->attributes[0]);
rcpputils::this_thread::run_with_thread_attribute(
thread_attr, &SingleThreadedExecutor::run, this);
} else {
run();
}
}

void
SingleThreadedExecutor::run()
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand Down