Skip to content

Commit

Permalink
Revert "add mutex to protect events_executor current entity collection (
Browse files Browse the repository at this point in the history
ros2#2187)"

This reverts commit 4120b79.
  • Loading branch information
jplapp committed Jan 25, 2024
1 parent 6698620 commit 16aa7a1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,8 @@ class EventsExecutor : public rclcpp::Executor
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;

std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// Mutex to protect the current_entities_collection_
std::recursive_mutex collection_mutex_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// Flag used to reduce the number of unnecessary waitable events
std::atomic<bool> notify_waitable_event_pushed_ {false};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,10 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
switch (event.type) {
case ExecutorEventType::CLIENT_EVENT:
{
rclcpp::ClientBase::SharedPtr client;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
client = this->retrieve_entity(
static_cast<const rcl_client_t *>(event.entity_key),
current_entities_collection_->clients);
}
auto client = this->retrieve_entity(
static_cast<const rcl_client_t *>(event.entity_key),
current_entities_collection_->clients);

if (client) {
for (size_t i = 0; i < event.num_events; i++) {
execute_client(client);
Expand All @@ -290,13 +287,9 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::SUBSCRIPTION_EVENT:
{
rclcpp::SubscriptionBase::SharedPtr subscription;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
subscription = this->retrieve_entity(
static_cast<const rcl_subscription_t *>(event.entity_key),
current_entities_collection_->subscriptions);
}
auto subscription = this->retrieve_entity(
static_cast<const rcl_subscription_t *>(event.entity_key),
current_entities_collection_->subscriptions);
if (subscription) {
for (size_t i = 0; i < event.num_events; i++) {
execute_subscription(subscription);
Expand All @@ -306,13 +299,10 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::SERVICE_EVENT:
{
rclcpp::ServiceBase::SharedPtr service;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
service = this->retrieve_entity(
static_cast<const rcl_service_t *>(event.entity_key),
current_entities_collection_->services);
}
auto service = this->retrieve_entity(
static_cast<const rcl_service_t *>(event.entity_key),
current_entities_collection_->services);

if (service) {
for (size_t i = 0; i < event.num_events; i++) {
execute_service(service);
Expand All @@ -329,13 +319,9 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::WAITABLE_EVENT:
{
rclcpp::Waitable::SharedPtr waitable;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
waitable = this->retrieve_entity(
static_cast<const rclcpp::Waitable *>(event.entity_key),
current_entities_collection_->waitables);
}
auto waitable = this->retrieve_entity(
static_cast<const rclcpp::Waitable *>(event.entity_key),
current_entities_collection_->waitables);
if (waitable) {
for (size_t i = 0; i < event.num_events; i++) {
auto data = waitable->take_data_by_entity_id(event.waitable_data);
Expand Down Expand Up @@ -400,7 +386,6 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
void
EventsExecutor::refresh_current_collection_from_callback_groups()
{
// Build the new collection
this->entities_collector_->update_collections();
auto callback_groups = this->entities_collector_->get_all_callback_groups();
rclcpp::executors::ExecutorEntitiesCollection new_collection;
Expand All @@ -415,9 +400,6 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
// To do it, we need to add the notify waitable as an entry in both the new and
// current collections such that it's neither added or removed.
this->add_notify_waitable_to_collection(new_collection.waitables);

// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

this->refresh_current_collection(new_collection);
Expand All @@ -427,9 +409,6 @@ void
EventsExecutor::refresh_current_collection(
const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
{
// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);

current_entities_collection_->timers.update(
new_collection.timers,
[this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
Expand Down
54 changes: 0 additions & 54 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -796,60 +796,6 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
}
}

// This test verifies the thread-safety of adding and removing a node
// while the executor is spinning and events are ready.
// This test does not contain expectations, but rather it verifies that
// we can run a "stressful routine" without crashing.
TYPED_TEST(TestExecutors, stressAddRemoveNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}

ExecutorType executor;

// A timer that is "always" ready (the timer callback doesn't do anything)
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});

// This thread spins the executor until it's cancelled
std::thread spinner_thread([&]() {
executor.spin();
});

// This thread publishes data in a busy loop (the node has a subscription)
std::thread publisher_thread1([&]() {
for (size_t i = 0; i < 100000; i++) {
this->publisher->publish(test_msgs::msg::Empty());
}
});
std::thread publisher_thread2([&]() {
for (size_t i = 0; i < 100000; i++) {
this->publisher->publish(test_msgs::msg::Empty());
}
});

// This thread adds/remove the node that contains the entities in a busy loop
std::thread add_remove_thread([&]() {
for (size_t i = 0; i < 100000; i++) {
executor.add_node(this->node);
executor.remove_node(this->node);
}
});

// Wait for the threads that do real work to finish
publisher_thread1.join();
publisher_thread2.join();
add_remove_thread.join();

executor.cancel();
spinner_thread.join();
}

// Check spin_until_future_complete with node base pointer (instantiates its own executor)
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
{
Expand Down

0 comments on commit 16aa7a1

Please sign in to comment.