diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index f7a8bf40924..843dd0d01fd 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -690,7 +691,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant( return nullptr; } -BaseReader* RTPSDomainImpl::find_local_reader( +std::shared_ptr RTPSDomainImpl::find_local_reader( const GUID_t& reader_guid) { auto instance = get_instance(); @@ -704,7 +705,7 @@ BaseReader* RTPSDomainImpl::find_local_reader( } } - return nullptr; + return std::shared_ptr(nullptr); } BaseWriter* RTPSDomainImpl::find_local_writer( diff --git a/src/cpp/rtps/RTPSDomainImpl.hpp b/src/cpp/rtps/RTPSDomainImpl.hpp index 605d2ed1461..0a5fe87b650 100644 --- a/src/cpp/rtps/RTPSDomainImpl.hpp +++ b/src/cpp/rtps/RTPSDomainImpl.hpp @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -175,7 +176,7 @@ class RTPSDomainImpl * * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ - static BaseReader* find_local_reader( + static std::shared_ptr find_local_reader( const GUID_t& reader_guid); /** diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index cce72851d46..f96385200a8 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1350,7 +1350,7 @@ bool RTPSParticipantImpl::createReader( return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback); } -BaseReader* RTPSParticipantImpl::find_local_reader( +std::shared_ptr RTPSParticipantImpl::find_local_reader( const GUID_t& reader_guid) { shared_lock _(endpoints_list_mutex); @@ -1359,11 +1359,11 @@ BaseReader* RTPSParticipantImpl::find_local_reader( { if (reader->getGuid() == reader_guid) { - return reader; + return reader->get_local_pointer(); } } - return nullptr; + return std::shared_ptr(); } BaseWriter* RTPSParticipantImpl::find_local_writer( @@ -1960,6 +1960,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( bool found = false, found_in_users = false; Endpoint* p_endpoint = nullptr; + BaseReader* reader = nullptr; if (endpoint.entityId.is_writer()) { @@ -1994,6 +1995,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it { + reader = *rit; m_userReaderList.erase(rit); found_in_users = true; break; @@ -2004,6 +2006,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it { + reader = *rit; p_endpoint = *rit; m_allReaderList.erase(rit); found = true; @@ -2062,6 +2065,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint( #endif // if HAVE_SECURITY } + if (reader) + { + reader->local_actions_on_reader_removed(); + } delete(p_endpoint); return true; } @@ -2149,6 +2156,11 @@ void RTPSParticipantImpl::deleteAllUserEndpoints() } #endif // if HAVE_SECURITY + if (kind == READER) + { + static_cast(endpoint)->local_actions_on_reader_removed(); + } + // remove the endpoints delete(endpoint); } @@ -2874,8 +2886,11 @@ bool RTPSParticipantImpl::register_in_reader( } else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId)) { - BaseReader* reader = find_local_reader(reader_guid); - res = reader->add_statistics_listener(listener); + LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid)); + if (local_reader) + { + res = local_reader->add_statistics_listener(listener); + } } return res; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp index 736c7de36f5..5205f971cb4 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp @@ -56,6 +56,7 @@ #include #include #include +#include #include #include #include @@ -477,7 +478,7 @@ class RTPSParticipantImpl /*** * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ - BaseReader* find_local_reader( + std::shared_ptr find_local_reader( const GUID_t& reader_guid); /*** diff --git a/src/cpp/rtps/reader/BaseReader.cpp b/src/cpp/rtps/reader/BaseReader.cpp index 77332e4e7b2..4beb9f2e807 100644 --- a/src/cpp/rtps/reader/BaseReader.cpp +++ b/src/cpp/rtps/reader/BaseReader.cpp @@ -107,6 +107,11 @@ BaseReader::BaseReader( setup_datasharing(att); } +void BaseReader::local_actions_on_reader_removed() +{ + local_ptr_->deactivate(); +} + BaseReader::~BaseReader() { EPROSIMA_LOG_INFO(RTPS_READER, "Removing reader " << this->getGuid().entityId); @@ -272,6 +277,11 @@ void BaseReader::allow_unknown_writers() accept_messages_from_unkown_writers_ = true; } +std::shared_ptr BaseReader::get_local_pointer() +{ + return local_ptr_; +} + bool BaseReader::reserve_cache( uint32_t cdr_payload_size, fastdds::rtps::CacheChange_t*& change) @@ -501,6 +511,8 @@ void BaseReader::init( fixed_payload_size_ = history_->m_att.payloadMaxSize; } + local_ptr_ = std::make_shared(this); + EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly"); } diff --git a/src/cpp/rtps/reader/BaseReader.hpp b/src/cpp/rtps/reader/BaseReader.hpp index 3a9c382167d..596e335ce53 100644 --- a/src/cpp/rtps/reader/BaseReader.hpp +++ b/src/cpp/rtps/reader/BaseReader.hpp @@ -42,6 +42,8 @@ #include #include +#include + namespace eprosima { namespace fastdds { @@ -163,6 +165,14 @@ class BaseReader return datasharing_listener_; } + /** + * @brief Retrieves the local pointer to this reader + * to be used by other local entities. + * + * @return Local pointer to this reader. + */ + std::shared_ptr get_local_pointer(); + /** * @brief Reserve a CacheChange_t. * @@ -296,6 +306,11 @@ class BaseReader const fastdds::rtps::SequenceNumberSet_t& gapList, VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; + /** + * @brief Waits for not being referenced/used by any other entity. + */ + virtual void local_actions_on_reader_removed(); + #ifdef FASTDDS_STATISTICS bool add_statistics_listener( @@ -455,6 +470,9 @@ class BaseReader /// Trusted writer (for Builtin) fastdds::rtps::EntityId_t trusted_writer_entity_id_; + /// RefCountedPointer of this instance. + std::shared_ptr local_ptr_; + private: /** diff --git a/src/cpp/rtps/reader/LocalReaderPointer.hpp b/src/cpp/rtps/reader/LocalReaderPointer.hpp new file mode 100644 index 00000000000..7e16177907d --- /dev/null +++ b/src/cpp/rtps/reader/LocalReaderPointer.hpp @@ -0,0 +1,36 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file LocalReaderPointer.hpp + */ + +#ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP +#define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +class BaseReader; + +using LocalReaderPointer = RefCountedPointer; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP diff --git a/src/cpp/rtps/writer/ReaderLocator.cpp b/src/cpp/rtps/writer/ReaderLocator.cpp index 94e23c0dd99..29b9e4f2882 100644 --- a/src/cpp/rtps/writer/ReaderLocator.cpp +++ b/src/cpp/rtps/writer/ReaderLocator.cpp @@ -45,7 +45,7 @@ ReaderLocator::ReaderLocator( , async_locator_info_(max_unicast_locators, max_multicast_locators) , expects_inline_qos_(false) , is_local_reader_(false) - , local_reader_(nullptr) + , local_reader_() , guid_prefix_as_vector_(1u) , guid_as_vector_(1u) , datasharing_notifier_(nullptr) @@ -84,7 +84,7 @@ bool ReaderLocator::start( is_local_reader_ = RTPSDomainImpl::should_intraprocess_between(owner_->getGuid(), remote_guid); is_datasharing &= !is_local_reader_; - local_reader_ = nullptr; + local_reader_.reset(); if (!is_local_reader_ && !is_datasharing) { @@ -177,7 +177,7 @@ void ReaderLocator::stop() guid_prefix_as_vector_.at(0) = c_GuidPrefix_Unknown; expects_inline_qos_ = false; is_local_reader_ = false; - local_reader_ = nullptr; + local_reader_.reset(); } bool ReaderLocator::send( @@ -206,13 +206,13 @@ bool ReaderLocator::send( return true; } -BaseReader* ReaderLocator::local_reader() +LocalReaderPointer::Instance ReaderLocator::local_reader() { if (!local_reader_) { local_reader_ = RTPSDomainImpl::find_local_reader(general_locator_info_.remote_guid); } - return local_reader_; + return LocalReaderPointer::Instance(local_reader_); } bool ReaderLocator::is_datasharing_reader() const @@ -222,15 +222,13 @@ bool ReaderLocator::is_datasharing_reader() const void ReaderLocator::datasharing_notify() { - RTPSReader* reader = nullptr; if (is_local_reader()) { - reader = local_reader(); - } - - if (reader) - { - BaseReader::downcast(reader)->datasharing_listener()->notify(true); + LocalReaderPointer::Instance reader = local_reader(); + if (reader) + { + reader->datasharing_listener()->notify(true); + } } else { diff --git a/src/cpp/rtps/writer/ReaderLocator.hpp b/src/cpp/rtps/writer/ReaderLocator.hpp index 291617c0f0c..7e344f75ea4 100644 --- a/src/cpp/rtps/writer/ReaderLocator.hpp +++ b/src/cpp/rtps/writer/ReaderLocator.hpp @@ -25,6 +25,8 @@ #include #include +#include + namespace eprosima { namespace fastdds { namespace rtps { @@ -67,10 +69,10 @@ class ReaderLocator : public RTPSMessageSenderInterface return is_local_reader_; } - BaseReader* local_reader(); + LocalReaderPointer::Instance local_reader(); void local_reader( - BaseReader* local_reader) + std::shared_ptr local_reader) { local_reader_ = local_reader; } @@ -260,7 +262,7 @@ class ReaderLocator : public RTPSMessageSenderInterface LocatorSelectorEntry async_locator_info_; bool expects_inline_qos_; bool is_local_reader_; - BaseReader* local_reader_; + std::shared_ptr local_reader_; std::vector guid_prefix_as_vector_; std::vector guid_as_vector_; IDataSharingNotifier* datasharing_notifier_; diff --git a/src/cpp/rtps/writer/ReaderProxy.hpp b/src/cpp/rtps/writer/ReaderProxy.hpp index 163aca7ef7d..467cc455149 100644 --- a/src/cpp/rtps/writer/ReaderProxy.hpp +++ b/src/cpp/rtps/writer/ReaderProxy.hpp @@ -290,7 +290,7 @@ class ReaderProxy * Get the local reader on the same process (if any). * @return The local reader on the same process. */ - inline BaseReader* local_reader() + inline LocalReaderPointer::Instance local_reader() { return locator_info_.local_reader(); } diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 69e0d17c225..904ccb296d7 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -405,14 +406,14 @@ bool StatefulWriter::intraprocess_delivery( CacheChange_t* change, ReaderProxy* reader_proxy) { - BaseReader* reader = reader_proxy->local_reader(); - if (reader) + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); + if (local_reader) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } - return reader->process_data_msg(change); + return local_reader->process_data_msg(change); } return false; } @@ -422,10 +423,10 @@ bool StatefulWriter::intraprocess_gap( const SequenceNumber_t& first_seq, const SequenceNumber_t& last_seq) { - RTPSReader* reader = reader_proxy->local_reader(); - if (reader) + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); + if (local_reader) { - return BaseReader::downcast(reader)->process_gap_msg( + return local_reader->process_gap_msg( m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); } @@ -437,12 +438,11 @@ bool StatefulWriter::intraprocess_heartbeat( bool liveliness) { bool returned_value = false; + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); - std::lock_guard guardW(mp_mutex); - RTPSReader* reader = RTPSDomainImpl::find_local_reader(reader_proxy->guid()); - - if (reader) + if (local_reader) { + std::unique_lock lockW(mp_mutex); SequenceNumber_t first_seq = get_seq_num_min(); SequenceNumber_t last_seq = get_seq_num_max(); @@ -459,8 +459,10 @@ bool StatefulWriter::intraprocess_heartbeat( (liveliness || reader_proxy->has_changes())) { increment_hb_count(); - returned_value = BaseReader::downcast(reader)->process_heartbeat_msg( - m_guid, heartbeat_count_, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); + Count_t hb_count = heartbeat_count_; + lockW.unlock(); + returned_value = local_reader->process_heartbeat_msg( + m_guid, hb_count, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); } } diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index a93b2603f0c..3cd88fa6fd7 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -313,16 +313,16 @@ bool StatelessWriter::intraprocess_delivery( CacheChange_t* change, ReaderLocator& reader_locator) { - RTPSReader* reader = reader_locator.local_reader(); + LocalReaderPointer::Instance local_reader = reader_locator.local_reader(); - if (reader && + if (local_reader && (!reader_data_filter_ || reader_data_filter_->is_relevant(*change, reader_locator.remote_guid()))) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } - return BaseReader::downcast(reader)->process_data_msg(change); + return local_reader->process_data_msg(change); } return false; @@ -963,7 +963,7 @@ bool StatelessWriter::get_connections( //! intraprocess for_matched_readers(matched_local_readers_, [&connection, &connection_list](ReaderLocator& reader) { - connection.guid(fastdds::statistics::to_statistics_type(reader.local_reader()->getGuid())); + connection.guid(fastdds::statistics::to_statistics_type(reader.remote_guid())); connection.mode(fastdds::statistics::ConnectionMode::INTRAPROCESS); connection_list.push_back(connection); diff --git a/src/cpp/utils/RefCountedPointer.hpp b/src/cpp/utils/RefCountedPointer.hpp new file mode 100644 index 00000000000..bb1cfa70b3e --- /dev/null +++ b/src/cpp/utils/RefCountedPointer.hpp @@ -0,0 +1,218 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file RefCountedPointer.hpp + */ + +#ifndef UTILS__REFCOUNTEDPOINTER_HPP +#define UTILS__REFCOUNTEDPOINTER_HPP + +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { + +/** + * @brief Class to manage a local pointer with reference counting. + * + * It is similar to std::shared_ptr, but designed for cases where + * a shared pointer cannot be used due to API restrictions. + * + * USAGE: + * - On T class: + * - Add a shared_ptr> local_ptr_ member. + * - Call local_ptr_->deactivate() before destroying T. + * + * - On classes that need to use a pointer to T: + * - Keep a copy of the shared_ptr>. + * - Whenever you need to access T: + * RefCountedPointer::Instance instance(local_ptr_) + * if (instance) + * { + * ptr->method(); + * } + */ +template +class RefCountedPointer +{ +public: + + class Instance; + + /** + * @brief Explicit constructor. + * @param ptr Pointer to manage. + * + * @pre nullptr != ptr. We must ensure that the pointer we + * are manaing is valid. + */ + explicit RefCountedPointer( + T* ptr) + : ptr_(ptr) + , is_active_(true) + , instances_(0) + { + assert(nullptr != ptr); + } + + ~RefCountedPointer() = default; + + // Non-copyable and non-movable + RefCountedPointer( + const RefCountedPointer&) = delete; + RefCountedPointer& operator =( + const RefCountedPointer&) = delete; + RefCountedPointer( + RefCountedPointer&&) = delete; + RefCountedPointer& operator =( + RefCountedPointer&&) = delete; + + /** + * @brief Class to manage the local pointer instance. + * It will increase the reference count on construction and decrease + * it on destruction. Provides a facade to access the pointee. + */ + class Instance + { + public: + + /** + * @brief Constructor. + * @param parent Shared pointer reference to its RefCountedPointer. + */ + explicit Instance( + const std::shared_ptr>& parent) + : parent_(parent) + , ptr_(parent && parent->is_active_ ? parent->ptr_ : nullptr) + { + if (parent_) + { + parent_->inc_instances(); + } + } + + /** + * @brief Destructor. + */ + ~Instance() + { + if (parent_) + { + parent_->dec_instances(); + } + } + + // Non-copyable, default movable + Instance( + const Instance&) = delete; + Instance& operator =( + const Instance&) = delete; + Instance( + Instance&&) = default; + Instance& operator =( + Instance&&) = default; + + /** + * @brief operator to check if the pointer is valid. + */ + operator bool() const + { + return nullptr != ptr_; + } + + /** + * @brief operator to call the T methods. + */ + T* operator ->() const + { + assert(nullptr != ptr_); + return ptr_; + } + + private: + + std::shared_ptr> parent_; + T* const ptr_; + }; + + /** + * @brief Ensure no more valid local pointer instances are created, and wait for current ones to die. + */ + void deactivate() + { + std::unique_lock lock(mutex_); + is_active_ = false; + cv_.wait(lock, [this]() -> bool + { + return instances_ == 0; + }); + } + +private: + + /** + * @brief Increase the reference count. + */ + void inc_instances() + { + std::unique_lock lock(mutex_); + ++instances_; + } + + /** + * @brief Decrease the reference count. + */ + void dec_instances() + { + std::unique_lock lock(mutex_); + --instances_; + if (instances_ == 0) + { + cv_.notify_one(); + } + } + + /** + * Pointer to the managed object. + */ + T* const ptr_; + + /** + * Indicates whether the pointee is still alive + * and accessing the pointer is valid. + */ + std::atomic is_active_; + + /** + * Protections for the number of instances. + */ + mutable std::mutex mutex_; + std::condition_variable cv_; + + /** + * Number of active instances (currently using the pointee). + */ + size_t instances_; +}; + +} // namespace fastdds +} // namespace eprosima + +#endif // UTILS__REFCOUNTEDPOINTER_HPP diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 5b865787eca..1670ec49b0c 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -46,6 +46,7 @@ #include "BlackboxTests.hpp" #include "mock/BlackboxMockConsumer.h" #include "../api/dds-pim/CustomPayloadPool.hpp" +#include "../api/dds-pim/PubSubParticipant.hpp" #include "../api/dds-pim/PubSubReader.hpp" #include "../api/dds-pim/PubSubWriter.hpp" #include "../api/dds-pim/PubSubWriterReader.hpp" @@ -932,6 +933,88 @@ TEST(DDSBasic, register_two_identical_typesupports) EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_2)); } +/** + * @test This is a regression test for Redmine Issue 21293. + * The destruction among intra-process participants should be correctly performed. + * local_reader() has to return a valid pointer. + * + */ +TEST(DDSBasic, successful_destruction_among_intraprocess_participants) +{ + namespace dds = eprosima::fastdds::dds; + auto factory = dds::DomainParticipantFactory::get_instance(); + + // Set intraprocess delivery to full + LibrarySettings library_settings; + factory->get_library_settings(library_settings); + auto old_library_settings = library_settings; + library_settings.intraprocess_delivery = INTRAPROCESS_FULL; + factory->set_library_settings(library_settings); + + { + auto participant_1 = std::make_shared>(1u, 1u, 1u, 1u); + + ASSERT_TRUE(participant_1->init_participant()); + participant_1->pub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(participant_1->init_publisher(0u)); + participant_1->sub_topic_name(TEST_TOPIC_NAME + "_Return"); + ASSERT_TRUE(participant_1->init_subscriber(0u)); + + std::vector>> reception_participants; + + size_t num_reception_participants = 50; + + for (size_t i = 0; i < num_reception_participants; i++) + { + reception_participants.push_back(std::make_shared>(1u, 1u, 1u, 1u)); + ASSERT_TRUE(reception_participants.back()->init_participant()); + reception_participants.back()->sub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(reception_participants.back()->init_subscriber(0u)); + reception_participants.back()->pub_topic_name(TEST_TOPIC_NAME + "_Return"); + ASSERT_TRUE(reception_participants.back()->init_publisher(0u)); + } + + participant_1->wait_discovery(std::chrono::seconds::zero(), (uint8_t)num_reception_participants, true); + + participant_1->pub_wait_discovery((unsigned int)num_reception_participants); + participant_1->sub_wait_discovery((unsigned int)num_reception_participants); + + auto data_12 = default_helloworld_data_generator(); + + std::thread p1_thread([&participant_1, &data_12]() + { + auto data_size = data_12.size(); + for (size_t i = 0; i < data_size; i++) + { + participant_1->send_sample(data_12.back()); + data_12.pop_back(); + } + }); + + std::vector reception_threads; + reception_threads.reserve(num_reception_participants); + for (auto& reception_participant : reception_participants) + { + reception_threads.emplace_back([&reception_participant]() + { + auto data_21 = default_helloworld_data_generator(); + for (auto& data : data_21) + { + reception_participant->send_sample(data); + } + + reception_participant.reset(); + }); + } + + p1_thread.join(); + for (auto& rec_thread : reception_threads) + { + rec_thread.join(); + } + } +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp b/test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp index 4886ee24e26..337d2f74c7f 100644 --- a/test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp +++ b/test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp @@ -27,6 +27,8 @@ #include #include +#include + namespace eprosima { namespace fastdds { namespace rtps { @@ -202,9 +204,9 @@ class ReaderLocator : public RTPSMessageSenderInterface return false; } - BaseReader* local_reader() + LocalReaderPointer::Instance local_reader() { - return nullptr; + return LocalReaderPointer::Instance(std::shared_ptr()); } bool is_datasharing_reader() const diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index d9848256f0c..32ea0d63d55 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -71,6 +71,9 @@ set(SYSTEMINFOTESTS_SOURCE set(TREETESTS_SOURCE TreeNodeTests.cpp) +set(REF_COUNTED_POINTER_TESTS_SOURCE + RefCountedPointerTests.cpp) + include_directories(mock/) add_executable(StringMatchingTests ${STRINGMATCHINGTESTS_SOURCE}) @@ -178,6 +181,11 @@ target_include_directories(TreeNodeTests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp) target_link_libraries(TreeNodeTests PUBLIC GTest::gtest) gtest_discover_tests(TreeNodeTests) +add_executable(RefCountedPointerTests ${REF_COUNTED_POINTER_TESTS_SOURCE}) +target_include_directories(RefCountedPointerTests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp) +target_link_libraries(RefCountedPointerTests PUBLIC GTest::gtest) +gtest_discover_tests(RefCountedPointerTests) + ############################################################################### # Necessary files ############################################################################### diff --git a/test/unittest/utils/RefCountedPointerTests.cpp b/test/unittest/utils/RefCountedPointerTests.cpp new file mode 100644 index 00000000000..a939cc7b414 --- /dev/null +++ b/test/unittest/utils/RefCountedPointerTests.cpp @@ -0,0 +1,183 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include + +using namespace std; + +namespace eprosima { +namespace fastdds { + +struct EntityMock +{ + EntityMock() + : local_pointer(std::make_shared>(this)) + , n_times_data_processed(0) + { + } + + std::shared_ptr> get_refcounter_pointer() const + { + return local_pointer; + } + + void dummy_process_data( + void*) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ++n_times_data_processed; + } + + void destroy() + { + local_pointer->deactivate(); + } + + std::shared_ptr> local_pointer; + std::atomic n_times_data_processed; +}; + +enum class RoutineStatus +{ + NON_INITIALIZED, + SUCCESS, + FAILURE +}; + +struct EntityOwner +{ + EntityOwner( + const EntityMock& entity) + : entity_ptr(entity.get_refcounter_pointer()) + , routine_status(RoutineStatus::NON_INITIALIZED) + { + } + + void spawn_routine() + { + th = std::thread([&]() + { + RefCountedPointer::Instance entity_instance(entity_ptr); + if (entity_instance) + { + entity_instance->dummy_process_data(nullptr); + routine_status = RoutineStatus::SUCCESS; + } + else + { + routine_status = RoutineStatus::FAILURE; + } + }); + } + + void join() + { + th.join(); + } + + std::shared_ptr> entity_ptr; + RoutineStatus routine_status; + std::thread th; +}; + +class RefCountedPointerTests : public ::testing::Test +{ +public: + + static constexpr std::size_t n_owners = 5; + + void SetUp() override + { + owners_.reserve(5); + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_.emplace_back(entity_); + } + } + + void TearDown() override + { + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_[i].join(); + } + } + +protected: + + EntityMock entity_; + std::vector owners_; +}; + +TEST_F(RefCountedPointerTests, refcountedpointer_inactive) +{ + // Make the first owner spawn a routine + owners_[0].spawn_routine(); + + // Wait for the routine to finish + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_EQ(owners_[0].routine_status, RoutineStatus::SUCCESS); + + // Destroy the entity + entity_.destroy(); + + // Make the rest of the owners spawn a routine + for (std::size_t i = 1; i < n_owners; ++i) + { + owners_[i].spawn_routine(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // The routine should fail + ASSERT_EQ(owners_[i].routine_status, RoutineStatus::FAILURE); + } + + // The entity should have been processed only once + ASSERT_EQ(1, entity_.n_times_data_processed); +} + +TEST_F(RefCountedPointerTests, refcounterpointer_deactivate_waits_for_no_references) +{ + // Spawn some routines + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_[i].spawn_routine(); + } + + // Ensure owners' routines have started + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto t0 = std::chrono::steady_clock::now(); + entity_.destroy(); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count(); + + std::cout << "Elapsed time: " << elapsed << " ms" << std::endl; + ASSERT_GT(elapsed, 50); // destroy should have taken at least 50 ms. Being strict it should be 80, but we allow some margin + ASSERT_EQ(entity_.n_times_data_processed, 5); +} + +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}