Skip to content

Commit

Permalink
Fix dataraces when creating DataWriters (#3051)
Browse files Browse the repository at this point in the history
* Fix dataraces when creating DataWriters (#3015)

* Refs #15905: Declare the PublishMode running flag as atomic

Signed-off-by: Eduardo Ponz <[email protected]>

* Refs #15905: Add RTPS regression test

Signed-off-by: Eduardo Ponz <[email protected]>

* Refs #15905: Add DomainParticipantImpl::create_instance_handle data race regression test

Signed-off-by: Eduardo Ponz <[email protected]>

* Refs #15905: Set DomainParticipantImpl::next_instance_id_ as atomic

Signed-off-by: Eduardo Ponz <[email protected]>

* Refs #15905: Apply suggestions

Signed-off-by: Eduardo Ponz <[email protected]>

Signed-off-by: Eduardo Ponz <[email protected]>
(cherry picked from commit 4391864)

Co-authored-by: Eduardo Ponz Segrelles <[email protected]>
(cherry picked from commit 90777ec)

# Conflicts:
#	test/blackbox/common/RTPSBlackboxTestsBasic.cpp

* Fixed conflicts

Signed-off-by: Miguel Company <[email protected]>

* Fix build on RTPSBlackboxTestsBasic.cpp

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Miguel Company <[email protected]>
  • Loading branch information
mergify[bot] and MiguelCompany authored Nov 3, 2022
1 parent 5b86ab1 commit acfd618
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 11 deletions.
8 changes: 4 additions & 4 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1805,12 +1805,12 @@ void DomainParticipantImpl::create_instance_handle(
{
using eprosima::fastrtps::rtps::octet;

++next_instance_id_;
uint32_t id = ++next_instance_id_;
handle = guid_;
handle.value[15] = 0x01; // Vendor specific;
handle.value[14] = static_cast<octet>(next_instance_id_ & 0xFF);
handle.value[13] = static_cast<octet>((next_instance_id_ >> 8) & 0xFF);
handle.value[12] = static_cast<octet>((next_instance_id_ >> 16) & 0xFF);
handle.value[14] = static_cast<octet>(id & 0xFF);
handle.value[13] = static_cast<octet>((id >> 8) & 0xFF);
handle.value[12] = static_cast<octet>((id >> 16) & 0xFF);
}

DomainParticipantListener* DomainParticipantImpl::get_listener_for(
Expand Down
5 changes: 4 additions & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#ifndef _FASTDDS_PARTICIPANTIMPL_HPP_
#define _FASTDDS_PARTICIPANTIMPL_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/participant/RTPSParticipantListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>
Expand Down Expand Up @@ -402,7 +405,7 @@ class DomainParticipantImpl
fastrtps::rtps::GUID_t guid_;

//!For instance handle creation
uint32_t next_instance_id_;
std::atomic<uint32_t> next_instance_id_;

//!Participant Qos
DomainParticipantQos qos_;
Expand Down
110 changes: 110 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <condition_variable>
#include <mutex>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantFactoryQos.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastrtps/types/TypesBase.h>

#include "BlackboxTests.hpp"

namespace eprosima {
namespace fastdds {
namespace dds {

using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t;

/**
* This test checks a race condition when calling DomainParticipantImpl::create_instance_handle()
* from different threads simultaneously. This was resulting in a `free(): invalid pointer` crash
* when deleting publishers created this way, as there was a clash in their respective instance
* handles. Not only did the crash occur, but it was also reported by TSan.
*
* The test spawns 200 threads, each creating a publisher and then waiting on a command from the
* main thread to delete them (so all of them at deleted at the same time).
*/
TEST(DDSBasic, MultithreadedPublisherCreation)
{
/* Get factory */
DomainParticipantFactory* factory = DomainParticipantFactory::get_instance();
ASSERT_NE(nullptr, factory);

/* Create participant */
DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(nullptr, participant);

/* Test synchronization variables */
std::mutex finish_mtx;
std::condition_variable finish_cv;
bool should_finish = false;

/* Function to create publishers, deleting them on command */
auto thread_run =
[participant, &finish_mtx, &finish_cv, &should_finish]()
{
/* Create publisher */
Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(nullptr, publisher);

{
/* Wait for test completion request */
std::unique_lock<std::mutex> lock(finish_mtx);
finish_cv.wait(lock, [&should_finish]()
{
return should_finish;
});
}

/* Delete publisher */
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher));
};

{
/* Create threads */
std::vector<std::thread> threads;
for (size_t i = 0; i < 200; i++)
{
threads.push_back(std::thread(thread_run));
}

/* Command threads to delete their publishers */
{
std::lock_guard<std::mutex> guard(finish_mtx);
should_finish = true;
finish_cv.notify_all();
}

/* Wait for all threads to join */
for (std::thread& thr : threads)
{
thr.join();
}
}

/* Clean up */
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
16 changes: 10 additions & 6 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@

#include "BlackboxTests.hpp"

#include <chrono>
#include <memory>
#include <thread>

#include <gtest/gtest.h>

#include <fastrtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/transport/test_UDPv4Transport.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "RTPSAsSocketReader.hpp"
#include "RTPSAsSocketWriter.hpp"
#include "RTPSWithRegistrationReader.hpp"
#include "RTPSWithRegistrationWriter.hpp"
#include <fastrtps/xmlparser/XMLProfileManager.h>
#include <fastrtps/transport/test_UDPv4Transport.h>

#include <gtest/gtest.h>

#include <thread>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
Expand Down

0 comments on commit acfd618

Please sign in to comment.