Skip to content

Commit

Permalink
Fixes bug regarding unregistration introduced with #1086. (#1197)
Browse files Browse the repository at this point in the history
Introduces two new structs (SLocalSubscriptionInfo and SExternSubscriptionInfo) and uses them for applying / unapplying registrations.
They are also stored in the ExpiredMap when topic are no longer visible on monitoring layer.
  • Loading branch information
KerstinKeller authored and FlorianReimold committed Oct 27, 2023
1 parent 204538e commit 536d35b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 47 deletions.
32 changes: 18 additions & 14 deletions ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ namespace eCAL

const auto& ecal_sample = ecal_sample_.topic();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
CDataWriter::SLocalSubscriptionInfo subscription_info;
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());
SDataTypeInformation topic_information{ eCALSampleToTopicInformation(ecal_sample_) };
const std::string process_id = std::to_string(ecal_sample.pid());

std::string reader_par;
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -135,7 +136,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyLocSubscription(process_id, topic_id, topic_information, reader_par);
iter->second->ApplyLocSubscription(subscription_info, topic_information, reader_par);
}
}

Expand All @@ -145,15 +146,16 @@ namespace eCAL

const auto& ecal_sample = ecal_sample_.topic();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
const std::string process_id = std::to_string(ecal_sample.pid());
CDataWriter::SLocalSubscriptionInfo subscription_info;
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());

// unregister local subscriber
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->RemoveLocSubscription(process_id, topic_id);
iter->second->RemoveLocSubscription(subscription_info);
}
}

Expand All @@ -162,11 +164,12 @@ namespace eCAL
if(!m_created) return;

const auto& ecal_sample = ecal_sample_.topic();
const std::string& host_name = ecal_sample.hname();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
CDataWriter::SExternalSubscriptionInfo subscription_info;
subscription_info.host_name = ecal_sample.hname();
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());
SDataTypeInformation topic_information{ eCALSampleToTopicInformation(ecal_sample_) };
const std::string process_id = std::to_string(ecal_sample.pid());

std::string reader_par;
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -185,7 +188,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyExtSubscription(host_name, process_id, topic_id, topic_information, reader_par);
iter->second->ApplyExtSubscription(subscription_info, topic_information, reader_par);
}
}

Expand All @@ -194,17 +197,18 @@ namespace eCAL
if (!m_created) return;

const auto& ecal_sample = ecal_sample_.topic();
const std::string& host_name = ecal_sample.hname();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
const std::string process_id = std::to_string(ecal_sample.pid());
CDataWriter::SExternalSubscriptionInfo subscription_info;
subscription_info.host_name = ecal_sample.hname();
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());

// unregister external subscriber
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->RemoveExtSubscription(host_name, process_id, topic_id);
iter->second->RemoveExtSubscription(subscription_info);
}
}

Expand Down
47 changes: 22 additions & 25 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,83 +655,79 @@ namespace eCAL
else return 0;
}

void CDataWriter::ApplyLocSubscription(const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
void CDataWriter::ApplyLocSubscription(const SLocalSubscriptionInfo& local_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
{
Connect(tid_, tinfo_);
Connect(local_info_.topic_id, tinfo_);

// add key to local subscriber map
const std::string topic_key = process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map[topic_key] = true;
m_loc_sub_map[local_info_] = true;
}

m_loc_subscribed = true;

// add a new local subscription
m_writer.udp_mc.AddLocConnection (process_id_, reader_par_);
m_writer.shm.AddLocConnection (process_id_, reader_par_);
m_writer.udp_mc.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, reader_par_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::ApplyLocSubscription");
#endif
}

void CDataWriter::RemoveLocSubscription(const std::string& process_id_, const std::string& tid_)
void CDataWriter::RemoveLocSubscription(const SLocalSubscriptionInfo& local_info_)
{
// remove key from local subscriber map
const std::string topic_key = process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.erase(topic_key);
m_loc_sub_map.erase(local_info_);
}

// remove a local subscription
m_writer.udp_mc.RemLocConnection (process_id_);
m_writer.shm.RemLocConnection (process_id_);
m_writer.udp_mc.RemLocConnection (local_info_.process_id);
m_writer.shm.RemLocConnection (local_info_.process_id);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::RemoveLocSubscription");
#endif
}

void CDataWriter::ApplyExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
void CDataWriter::ApplyExtSubscription(const SExternalSubscriptionInfo& external_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
{
Connect(tid_, tinfo_);
Connect(external_info_.topic_id, tinfo_);

// add key to external subscriber map
const std::string topic_key = host_name_ + process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_ext_sub_map[topic_key] = true;
m_ext_sub_map[external_info_] = true;
}

m_ext_subscribed = true;

// add a new external subscription
m_writer.udp_mc.AddExtConnection (host_name_, process_id_, reader_par_);
m_writer.shm.AddExtConnection (host_name_, process_id_, reader_par_);
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::ApplyExtSubscription");
#endif
}

void CDataWriter::RemoveExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_)
void CDataWriter::RemoveExtSubscription(const SExternalSubscriptionInfo& external_info_)
{
// remove key from external subscriber map
const std::string topic_key = host_name_ + process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_ext_sub_map.erase(topic_key);
m_ext_sub_map.erase(external_info_);
}

// remove external subscription
m_writer.udp_mc.RemExtConnection (host_name_, process_id_);
m_writer.shm.RemExtConnection (host_name_, process_id_);
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id);
}

void CDataWriter::RefreshRegistration()
Expand Down Expand Up @@ -768,7 +764,8 @@ namespace eCAL
Register(false);

// check connection timeouts
const std::shared_ptr<std::list<std::string>> loc_timeouts = std::make_shared<std::list<std::string>>();
// Todo: Why are only Local connections removed, not external connections?
const std::shared_ptr<std::list<SLocalSubscriptionInfo>> loc_timeouts = std::make_shared<std::list<SLocalSubscriptionInfo>>();
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.remove_deprecated(loc_timeouts.get());
Expand All @@ -780,7 +777,7 @@ namespace eCAL

for(const auto& loc_sub : *loc_timeouts)
{
m_writer.shm.RemLocConnection(loc_sub);
m_writer.shm.RemLocConnection(loc_sub.process_id);
}

if (!m_loc_subscribed && !m_ext_subscribed)
Expand Down Expand Up @@ -1275,7 +1272,7 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
for (auto sub : m_loc_sub_map)
{
if (sub.first != process_id)
if (sub.first.process_id != process_id)
{
is_internal_only = false;
break;
Expand Down
43 changes: 35 additions & 8 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,31 @@ namespace eCAL
class CDataWriter
{
public:
struct SExternalSubscriptionInfo
{
std::string host_name;
std::string process_id;
std::string topic_id;

friend bool operator<(const SExternalSubscriptionInfo& l, const SExternalSubscriptionInfo& r)
{
return std::tie(l.host_name, l.process_id, l.topic_id)
< std::tie(r.host_name, r.process_id, r.topic_id);
}
};

struct SLocalSubscriptionInfo
{
std::string process_id;
std::string topic_id;

friend bool operator<(const SLocalSubscriptionInfo& l, const SLocalSubscriptionInfo& r)
{
return std::tie(l.process_id, l.topic_id)
< std::tie(r.process_id, r.topic_id);
}
};

CDataWriter();
~CDataWriter();

Expand Down Expand Up @@ -77,11 +102,11 @@ namespace eCAL

size_t Write(CPayloadWriter& payload_, long long time_, long long id_);

void ApplyLocSubscription(const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveLocSubscription(const std::string & process_id_, const std::string& tid_);
void ApplyLocSubscription(const SLocalSubscriptionInfo& local_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveLocSubscription(const SLocalSubscriptionInfo& local_info_);

void ApplyExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveExtSubscription(const std::string & host_name_, const std::string & process_id_, const std::string& tid_);
void ApplyExtSubscription(const SExternalSubscriptionInfo& external_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveExtSubscription(const SExternalSubscriptionInfo& external_info_);

void RefreshRegistration();
void RefreshSendCounter();
Expand Down Expand Up @@ -137,10 +162,12 @@ namespace eCAL
std::vector<char> m_payload_buffer;

std::atomic<bool> m_connected;
using ConnectedMapT = Util::CExpMap<std::string, bool>;
mutable std::mutex m_sub_map_sync;
ConnectedMapT m_loc_sub_map;
ConnectedMapT m_ext_sub_map;

using LocalConnectedMapT = Util::CExpMap<SLocalSubscriptionInfo, bool>;
using ExternalConnectedMapT = Util::CExpMap<SExternalSubscriptionInfo, bool>;
mutable std::mutex m_sub_map_sync;
LocalConnectedMapT m_loc_sub_map;
ExternalConnectedMapT m_ext_sub_map;

std::mutex m_event_callback_map_sync;
using EventCallbackMapT = std::map<eCAL_Publisher_Event, PubEventCallbackT>;
Expand Down

0 comments on commit 536d35b

Please sign in to comment.