Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes bug regarding unregistration introduced with #1086. #1197

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ namespace eCAL

const auto& ecal_sample = ecal_sample_.topic();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
CDataWriter::SLocalSubscriptionInfo subscription_info;
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());
SDataTypeInformation topic_information{ eCALSampleToTopicInformation(ecal_sample_) };
const std::string process_id = std::to_string(ecal_sample.pid());

std::string reader_par;
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -135,7 +136,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyLocSubscription(process_id, topic_id, topic_information, reader_par);
iter->second->ApplyLocSubscription(subscription_info, topic_information, reader_par);
}
}

Expand All @@ -145,15 +146,16 @@ namespace eCAL

const auto& ecal_sample = ecal_sample_.topic();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
const std::string process_id = std::to_string(ecal_sample.pid());
CDataWriter::SLocalSubscriptionInfo subscription_info;
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());

// unregister local subscriber
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->RemoveLocSubscription(process_id, topic_id);
iter->second->RemoveLocSubscription(subscription_info);
}
}

Expand All @@ -162,11 +164,12 @@ namespace eCAL
if(!m_created) return;

const auto& ecal_sample = ecal_sample_.topic();
const std::string& host_name = ecal_sample.hname();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
CDataWriter::SExternalSubscriptionInfo subscription_info;
subscription_info.host_name = ecal_sample.hname();
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());
SDataTypeInformation topic_information{ eCALSampleToTopicInformation(ecal_sample_) };
const std::string process_id = std::to_string(ecal_sample.pid());

std::string reader_par;
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -185,7 +188,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyExtSubscription(host_name, process_id, topic_id, topic_information, reader_par);
iter->second->ApplyExtSubscription(subscription_info, topic_information, reader_par);
}
}

Expand All @@ -194,17 +197,18 @@ namespace eCAL
if (!m_created) return;

const auto& ecal_sample = ecal_sample_.topic();
const std::string& host_name = ecal_sample.hname();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
const std::string process_id = std::to_string(ecal_sample.pid());
CDataWriter::SExternalSubscriptionInfo subscription_info;
subscription_info.host_name = ecal_sample.hname();
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());

// unregister external subscriber
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->RemoveExtSubscription(host_name, process_id, topic_id);
iter->second->RemoveExtSubscription(subscription_info);
}
}

Expand Down
47 changes: 22 additions & 25 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,83 +655,79 @@ namespace eCAL
else return 0;
}

void CDataWriter::ApplyLocSubscription(const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
void CDataWriter::ApplyLocSubscription(const SLocalSubscriptionInfo& local_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
{
Connect(tid_, tinfo_);
Connect(local_info_.topic_id, tinfo_);

// add key to local subscriber map
const std::string topic_key = process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map[topic_key] = true;
m_loc_sub_map[local_info_] = true;
}

m_loc_subscribed = true;

// add a new local subscription
m_writer.udp_mc.AddLocConnection (process_id_, reader_par_);
m_writer.shm.AddLocConnection (process_id_, reader_par_);
m_writer.udp_mc.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, reader_par_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::ApplyLocSubscription");
#endif
}

void CDataWriter::RemoveLocSubscription(const std::string& process_id_, const std::string& tid_)
void CDataWriter::RemoveLocSubscription(const SLocalSubscriptionInfo& local_info_)
{
// remove key from local subscriber map
const std::string topic_key = process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.erase(topic_key);
m_loc_sub_map.erase(local_info_);
}

// remove a local subscription
m_writer.udp_mc.RemLocConnection (process_id_);
m_writer.shm.RemLocConnection (process_id_);
m_writer.udp_mc.RemLocConnection (local_info_.process_id);
m_writer.shm.RemLocConnection (local_info_.process_id);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::RemoveLocSubscription");
#endif
}

void CDataWriter::ApplyExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
void CDataWriter::ApplyExtSubscription(const SExternalSubscriptionInfo& external_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
{
Connect(tid_, tinfo_);
Connect(external_info_.topic_id, tinfo_);

// add key to external subscriber map
const std::string topic_key = host_name_ + process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_ext_sub_map[topic_key] = true;
m_ext_sub_map[external_info_] = true;
}

m_ext_subscribed = true;

// add a new external subscription
m_writer.udp_mc.AddExtConnection (host_name_, process_id_, reader_par_);
m_writer.shm.AddExtConnection (host_name_, process_id_, reader_par_);
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::ApplyExtSubscription");
#endif
}

void CDataWriter::RemoveExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_)
void CDataWriter::RemoveExtSubscription(const SExternalSubscriptionInfo& external_info_)
{
// remove key from external subscriber map
const std::string topic_key = host_name_ + process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_ext_sub_map.erase(topic_key);
m_ext_sub_map.erase(external_info_);
}

// remove external subscription
m_writer.udp_mc.RemExtConnection (host_name_, process_id_);
m_writer.shm.RemExtConnection (host_name_, process_id_);
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id);
}

void CDataWriter::RefreshRegistration()
Expand Down Expand Up @@ -768,7 +764,8 @@ namespace eCAL
Register(false);

// check connection timeouts
const std::shared_ptr<std::list<std::string>> loc_timeouts = std::make_shared<std::list<std::string>>();
// Todo: Why are only Local connections removed, not external connections?
const std::shared_ptr<std::list<SLocalSubscriptionInfo>> loc_timeouts = std::make_shared<std::list<SLocalSubscriptionInfo>>();
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.remove_deprecated(loc_timeouts.get());
Expand All @@ -780,7 +777,7 @@ namespace eCAL

for(const auto& loc_sub : *loc_timeouts)
{
m_writer.shm.RemLocConnection(loc_sub);
m_writer.shm.RemLocConnection(loc_sub.process_id);
}

if (!m_loc_subscribed && !m_ext_subscribed)
Expand Down Expand Up @@ -1275,7 +1272,7 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
for (auto sub : m_loc_sub_map)
{
if (sub.first != process_id)
if (sub.first.process_id != process_id)
{
is_internal_only = false;
break;
Expand Down
43 changes: 35 additions & 8 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,31 @@ namespace eCAL
class CDataWriter
{
public:
struct SExternalSubscriptionInfo
{
std::string host_name;
std::string process_id;
std::string topic_id;

friend bool operator<(const SExternalSubscriptionInfo& l, const SExternalSubscriptionInfo& r)
{
return std::tie(l.host_name, l.process_id, l.topic_id)
< std::tie(r.host_name, r.process_id, r.topic_id);
}
};

struct SLocalSubscriptionInfo
{
std::string process_id;
std::string topic_id;

friend bool operator<(const SLocalSubscriptionInfo& l, const SLocalSubscriptionInfo& r)
{
return std::tie(l.process_id, l.topic_id)
< std::tie(r.process_id, r.topic_id);
}
};

CDataWriter();
~CDataWriter();

Expand Down Expand Up @@ -77,11 +102,11 @@ namespace eCAL

size_t Write(CPayloadWriter& payload_, long long time_, long long id_);

void ApplyLocSubscription(const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveLocSubscription(const std::string & process_id_, const std::string& tid_);
void ApplyLocSubscription(const SLocalSubscriptionInfo& local_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveLocSubscription(const SLocalSubscriptionInfo& local_info_);

void ApplyExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveExtSubscription(const std::string & host_name_, const std::string & process_id_, const std::string& tid_);
void ApplyExtSubscription(const SExternalSubscriptionInfo& external_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveExtSubscription(const SExternalSubscriptionInfo& external_info_);

void RefreshRegistration();
void RefreshSendCounter();
Expand Down Expand Up @@ -137,10 +162,12 @@ namespace eCAL
std::vector<char> m_payload_buffer;

std::atomic<bool> m_connected;
using ConnectedMapT = Util::CExpMap<std::string, bool>;
mutable std::mutex m_sub_map_sync;
ConnectedMapT m_loc_sub_map;
ConnectedMapT m_ext_sub_map;

using LocalConnectedMapT = Util::CExpMap<SLocalSubscriptionInfo, bool>;
using ExternalConnectedMapT = Util::CExpMap<SExternalSubscriptionInfo, bool>;
mutable std::mutex m_sub_map_sync;
LocalConnectedMapT m_loc_sub_map;
ExternalConnectedMapT m_ext_sub_map;

std::mutex m_event_callback_map_sync;
using EventCallbackMapT = std::map<eCAL_Publisher_Event, PubEventCallbackT>;
Expand Down
Loading