Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discovery Server TSAN fixes [16106] #3070

Merged
merged 8 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ std::vector<fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::clear()
}
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Clearing DiscoveryDataBase");

std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

/* Clear receive queues. Set changes inside to release */
while (!pdp_data_queue_.Empty())
Expand Down Expand Up @@ -161,7 +161,7 @@ bool DiscoveryDataBase::pdp_is_relevant(
}

// Lock(shared mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "PDP is " << change.instanceHandle << " relevant to " << reader_guid);

Expand All @@ -185,7 +185,7 @@ bool DiscoveryDataBase::edp_publications_is_relevant(
fastrtps::rtps::GUID_t change_guid = guid_from_change(&change);

// Lock(shared mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

auto itp = participants_.find(change_guid.guidPrefix);
if (itp == participants_.end())
Expand Down Expand Up @@ -218,7 +218,7 @@ bool DiscoveryDataBase::edp_subscriptions_is_relevant(
fastrtps::rtps::GUID_t change_guid = guid_from_change(&change);

// Lock(shared mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

auto itp = participants_.find(change_guid.guidPrefix);
if (itp == participants_.end())
Expand Down Expand Up @@ -320,7 +320,7 @@ bool DiscoveryDataBase::update(
if (is_persistent_ && guid_from_change(change).guidPrefix != server_guid_prefix_)
{
// Does not allow to the server to erase the ddb before this message has been processed
std::unique_lock<std::recursive_mutex> lock(data_queues_mutex_);
std::lock_guard<std::recursive_mutex> guard(data_queues_mutex_);
nlohmann::json j;
ddb::to_json(j, *change);
backup_file_ << j;
Expand Down Expand Up @@ -352,7 +352,7 @@ bool DiscoveryDataBase::update(
if (is_persistent_ && guid_from_change(change).guidPrefix != server_guid_prefix_)
{
// Does not allow to the server to erase the ddb before this message has been process
std::unique_lock<std::recursive_mutex> lock(data_queues_mutex_);
std::lock_guard<std::recursive_mutex> guard(data_queues_mutex_);
nlohmann::json j;
ddb::to_json(j, *change);
backup_file_ << j;
Expand Down Expand Up @@ -380,14 +380,14 @@ bool DiscoveryDataBase::update(
const std::vector<eprosima::fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::changes_to_dispose()
{
// lock(sharing mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
return disposals_;
}

void DiscoveryDataBase::clear_changes_to_dispose()
{
// lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
disposals_.clear();
}

Expand All @@ -396,56 +396,56 @@ void DiscoveryDataBase::clear_changes_to_dispose()
const std::vector<eprosima::fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::pdp_to_send()
{
// lock(sharing mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
return pdp_to_send_;
}

void DiscoveryDataBase::clear_pdp_to_send()
{
// lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
pdp_to_send_.clear();
}

const std::vector<eprosima::fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::edp_publications_to_send()
{
// lock(sharing mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
return edp_publications_to_send_;
}

void DiscoveryDataBase::clear_edp_publications_to_send()
{
// lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
edp_publications_to_send_.clear();
}

const std::vector<eprosima::fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::edp_subscriptions_to_send()
{
// lock(sharing mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
return edp_subscriptions_to_send_;
}

void DiscoveryDataBase::clear_edp_subscriptions_to_send()
{
// lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
edp_subscriptions_to_send_.clear();
}

const std::vector<eprosima::fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::changes_to_release()
{
// lock(sharing mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
return changes_to_release_;
}

void DiscoveryDataBase::clear_changes_to_release()
{
// lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
changes_to_release_.clear();
}

Expand All @@ -460,7 +460,7 @@ void DiscoveryDataBase::process_pdp_data_queue()
}

// Lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

// Swap DATA queues
pdp_data_queue_.Swap();
Expand Down Expand Up @@ -500,7 +500,7 @@ bool DiscoveryDataBase::process_edp_data_queue()
bool is_dirty_topic = false;

// Lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

// Swap DATA queues
edp_data_queue_.Swap();
Expand Down Expand Up @@ -661,9 +661,12 @@ void DiscoveryDataBase::create_new_participant_from_change_(
{
fastrtps::rtps::GUID_t change_guid = guid_from_change(ch);

DiscoveryParticipantInfo part(ch, server_guid_prefix_, change_data);
std::pair<std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator, bool> ret =
participants_.insert(std::make_pair(change_guid.guidPrefix, part));
participants_.insert(
std::make_pair(
change_guid.guidPrefix,
DiscoveryParticipantInfo(ch, server_guid_prefix_, change_data)));

// If insert was successful
if (ret.second)
{
Expand Down Expand Up @@ -1346,7 +1349,7 @@ bool DiscoveryDataBase::process_dirty_topics()

// EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "process_dirty_topics start");
// Get shared lock
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

// Iterator objects are declared here because they are reused in each iteration of the loops
std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator parts_reader_it;
Expand Down Expand Up @@ -1489,7 +1492,7 @@ bool DiscoveryDataBase::delete_entity_of_change(
}

// Lock(exclusive mode) mutex locally
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);

if (change->kind == fastrtps::rtps::ChangeKind_t::ALIVE)
{
Expand Down Expand Up @@ -1592,6 +1595,8 @@ fastrtps::rtps::CacheChange_t* DiscoveryDataBase::cache_change_own_participant()

const std::vector<fastrtps::rtps::GuidPrefix_t> DiscoveryDataBase::direct_clients_and_servers()
{
std::lock_guard<std::recursive_mutex> guard(mutex_);

std::vector<fastrtps::rtps::GuidPrefix_t> direct_clients_and_servers;
// Iterate over participants to add the remote ones that are direct clients or servers
for (auto participant: participants_)
Expand All @@ -1611,15 +1616,17 @@ const std::vector<fastrtps::rtps::GuidPrefix_t> DiscoveryDataBase::direct_client

bool DiscoveryDataBase::server_acked_by_my_servers()
{
std::lock_guard<std::recursive_mutex> guard(mutex_);

if (servers_.size() == 0)
{
return true;
}

// Find the server's participant and check whether all its servers have ACKed the server's DATA(p)
auto this_server = participants_.find(server_guid_prefix_);

// check it is always there

assert(this_server != participants_.end());

for (auto prefix : servers_)
Expand All @@ -1634,6 +1641,8 @@ bool DiscoveryDataBase::server_acked_by_my_servers()

std::vector<fastrtps::rtps::GuidPrefix_t> DiscoveryDataBase::ack_pending_servers()
{
std::lock_guard<std::recursive_mutex> guard(mutex_);

std::vector<fastrtps::rtps::GuidPrefix_t> ack_pending_servers;
// Find the server's participant and check whether all its servers have ACKed the server's DATA(p)
auto this_server = participants_.find(server_guid_prefix_);
Expand Down Expand Up @@ -1702,6 +1711,8 @@ DiscoveryDataBase::AckedFunctor::~AckedFunctor()
void DiscoveryDataBase::AckedFunctor::operator () (
const eprosima::fastrtps::rtps::ReaderProxy* reader_proxy)
{
std::lock_guard<std::recursive_mutex> guard(db_->mutex_);

EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "functor operator in change: " << change_->instanceHandle);
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "for reader proxy: " << reader_proxy->guid());
// Check whether the change has been acknowledged by a given reader
Expand Down Expand Up @@ -2599,6 +2610,8 @@ void DiscoveryDataBase::persistence_enable(
bool DiscoveryDataBase::is_participant_local(
const eprosima::fastrtps::rtps::GuidPrefix_t& participant_prefix)
{
std::lock_guard<std::recursive_mutex> guard(mutex_);

auto pit = participants_.find(participant_prefix);
if (pit != participants_.end())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DiscoveryPDPDataQueueInfo : public DiscoveryDataQueueInfo

DiscoveryPDPDataQueueInfo(
eprosima::fastrtps::rtps::CacheChange_t* change,
DiscoveryParticipantChangeData participant_change_data)
const DiscoveryParticipantChangeData& participant_change_data)
: DiscoveryDataQueueInfo(change)
, participant_change_data_(participant_change_data)
{
Expand All @@ -77,7 +77,7 @@ class DiscoveryPDPDataQueueInfo : public DiscoveryDataQueueInfo

private:

DiscoveryParticipantChangeData participant_change_data_;
const DiscoveryParticipantChangeData participant_change_data_;

};

Expand All @@ -87,7 +87,7 @@ class DiscoveryEDPDataQueueInfo : public DiscoveryDataQueueInfo

DiscoveryEDPDataQueueInfo(
eprosima::fastrtps::rtps::CacheChange_t* change,
eprosima::fastrtps::string_255 topic)
const eprosima::fastrtps::string_255& topic)
: DiscoveryDataQueueInfo(change)
, topic_(topic)
{
Expand All @@ -104,7 +104,7 @@ class DiscoveryEDPDataQueueInfo : public DiscoveryDataQueueInfo

private:

eprosima::fastrtps::string_255 topic_;
const eprosima::fastrtps::string_255 topic_;

};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,9 @@ class DiscoveryParticipantsAckStatus

public:

DiscoveryParticipantsAckStatus()
{
}
DiscoveryParticipantsAckStatus() = default;

~DiscoveryParticipantsAckStatus()
{
}
~DiscoveryParticipantsAckStatus() = default;

void add_or_update_participant(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
Expand Down
14 changes: 7 additions & 7 deletions src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ class DiscoverySharedInfo
eprosima::fastrtps::rtps::CacheChange_t* change,
const eprosima::fastrtps::rtps::GuidPrefix_t& known_participant);

~DiscoverySharedInfo()
{
}
~DiscoverySharedInfo() = default;

virtual eprosima::fastrtps::rtps::CacheChange_t* update_and_unmatch(
eprosima::fastrtps::rtps::CacheChange_t* change);
Expand All @@ -60,9 +58,11 @@ class DiscoverySharedInfo
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
bool status = false)
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding relevant participant " << guid_p << " with status " << status << " to " <<
fastrtps::rtps::iHandle2GUID(
change_->instanceHandle));
EPROSIMA_LOG_INFO(
DISCOVERY_DATABASE,
"Adding relevant participant " << guid_p
<< " with status " << status
<< " to " << fastrtps::rtps::iHandle2GUID(change_->instanceHandle));
relevant_participants_builtin_ack_status_.add_or_update_participant(guid_p, status);
}

Expand Down Expand Up @@ -102,7 +102,7 @@ class DiscoverySharedInfo
virtual void to_json(
nlohmann::json& j) const;

private:
protected:
jsan-rt marked this conversation as resolved.
Show resolved Hide resolved

eprosima::fastrtps::rtps::CacheChange_t* change_;

Expand Down