Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/entity-unregistration #1086

Merged
merged 13 commits into from
May 10, 2023
Merged
14 changes: 14 additions & 0 deletions ecal/core/src/ecal_expmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,20 @@ namespace eCAL
}
};

// Remove specific element from the cache
// @Kerstin, pretty sure that this is not well implemented, please take a look
rex-schilasky marked this conversation as resolved.
Show resolved Hide resolved
bool erase(const Key& k)
{
auto it = _key_to_value.find(k);
if (it != _key_to_value.end())
{
_key_tracker.erase(it->second.second); // erase the element from the list
_key_to_value.erase(k); // erase the element from the map
return true;
}
return false;
};

// Remove all elements from the cache
void clear()
{
Expand Down
149 changes: 81 additions & 68 deletions ecal/core/src/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace eCAL
m_use_shm_monitoring(false)

{
};
}

CRegistrationProvider::~CRegistrationProvider()
{
Expand All @@ -82,16 +82,8 @@ namespace eCAL
if (m_use_network_monitoring)
{
SSenderAttr attr;
bool local_only = !Config::IsNetworkEnabled();
// for local only communication we switch to local broadcasting to bypass vpn's or firewalls
if (local_only)
{
attr.broadcast = true;
}
else
{
attr.broadcast = false;
}
attr.broadcast = !Config::IsNetworkEnabled();
attr.ipaddr = UDP::GetRegistrationMulticastAddress();
attr.port = Config::GetUdpMulticastPort() + NET_UDP_MULTICAST_PORT_REG_OFF;
attr.loopback = true;
Expand Down Expand Up @@ -137,26 +129,34 @@ namespace eCAL
bool CRegistrationProvider::RegisterTopic(const std::string& topic_name_, const std::string& topic_id_, const eCAL::pb::Sample& ecal_sample_, const bool force_)
{
if(!m_created) return(false);
if(!m_reg_topics) return (false);
if(!m_reg_topics) return(false);

std::lock_guard<std::mutex> lock(m_topics_map_sync);
const std::lock_guard<std::mutex> lock(m_topics_map_sync);
m_topics_map[topic_name_ + topic_id_] = ecal_sample_;
if(force_)
{
RegisterProcess();
RegisterSample(topic_name_, ecal_sample_);
// apply registration sample
ApplySample(topic_name_, ecal_sample_);
SendSampleList(false);
}

return(true);
}

bool CRegistrationProvider::UnregisterTopic(const std::string& topic_name_, const std::string& topic_id_)
bool CRegistrationProvider::UnregisterTopic(const std::string& topic_name_, const std::string& topic_id_, const eCAL::pb::Sample& ecal_sample_, const bool force_)
{
if(!m_created) return(false);

if (force_)
{
// apply unregistration sample
ApplySample(topic_name_, ecal_sample_);
SendSampleList(false);
}

SampleMapT::iterator iter;
std::lock_guard<std::mutex> lock(m_topics_map_sync);
const std::lock_guard<std::mutex> lock(m_topics_map_sync);
iter = m_topics_map.find(topic_name_ + topic_id_);
if(iter != m_topics_map.end())
{
Expand All @@ -172,24 +172,32 @@ namespace eCAL
if(!m_created) return(false);
if(!m_reg_services) return(false);

std::lock_guard<std::mutex> lock(m_server_map_sync);
const std::lock_guard<std::mutex> lock(m_server_map_sync);
m_server_map[service_name_ + service_id_] = ecal_sample_;
if(force_)
{
RegisterProcess();
RegisterSample(service_name_, ecal_sample_);
// apply registration sample
ApplySample(service_name_, ecal_sample_);
SendSampleList(false);
}

return(true);
}

bool CRegistrationProvider::UnregisterServer(const std::string& service_name_, const std::string& service_id_)
bool CRegistrationProvider::UnregisterServer(const std::string& service_name_, const std::string& service_id_, const eCAL::pb::Sample& ecal_sample_, const bool force_)
{
if(!m_created) return(false);

if (force_)
{
// apply unregistration sample
ApplySample(service_name_, ecal_sample_);
SendSampleList(false);
}

SampleMapT::iterator iter;
std::lock_guard<std::mutex> lock(m_server_map_sync);
const std::lock_guard<std::mutex> lock(m_server_map_sync);
iter = m_server_map.find(service_name_ + service_id_);
if(iter != m_server_map.end())
{
Expand All @@ -205,24 +213,32 @@ namespace eCAL
if (!m_created) return(false);
if (!m_reg_services) return(false);

std::lock_guard<std::mutex> lock(m_client_map_sync);
const std::lock_guard<std::mutex> lock(m_client_map_sync);
m_client_map[client_name_ + client_id_] = ecal_sample_;
if (force_)
{
RegisterProcess();
RegisterSample(client_name_, ecal_sample_);
// apply registration sample
ApplySample(client_name_, ecal_sample_);
SendSampleList(false);
}

return(true);
}

bool CRegistrationProvider::UnregisterClient(const std::string& client_name_, const std::string& client_id_)
bool CRegistrationProvider::UnregisterClient(const std::string& client_name_, const std::string& client_id_, const eCAL::pb::Sample& ecal_sample_, const bool force_)
{
if (!m_created) return(false);

if (force_)
{
// apply unregistration sample
ApplySample(client_name_, ecal_sample_);
SendSampleList(false);
}

SampleMapT::iterator iter;
std::lock_guard<std::mutex> lock(m_client_map_sync);
const std::lock_guard<std::mutex> lock(m_client_map_sync);
iter = m_client_map.find(client_name_ + client_id_);
if (iter != m_client_map.end())
{
Expand All @@ -235,12 +251,12 @@ namespace eCAL

bool CRegistrationProvider::RegisterProcess()
{
if(!m_created) return(0);
if(!m_reg_process) return(0);
if(!m_created) return(false);
if(!m_reg_process) return(false);

eCAL::pb::Sample process_sample;
process_sample.set_cmd_type(eCAL::pb::bct_reg_process);
auto process_sample_mutable_process = process_sample.mutable_process();
auto *process_sample_mutable_process = process_sample.mutable_process();
process_sample_mutable_process->set_hname(Process::GetHostName());
process_sample_mutable_process->set_pid(Process::GetProcessID());
process_sample_mutable_process->set_pname(Process::GetProcessName());
Expand All @@ -253,7 +269,7 @@ namespace eCAL
process_sample_mutable_process->set_dataread(google::protobuf::int64(Process::GetRBytes()));
process_sample_mutable_process->mutable_state()->set_severity(eCAL::pb::eProcessSeverity(g_process_severity));
process_sample_mutable_process->mutable_state()->set_info(g_process_info);
if (!g_timegate())
if (g_timegate() == nullptr)
{
process_sample_mutable_process->set_tsync_state(eCAL::pb::eTSyncState::tsync_none);
}
Expand Down Expand Up @@ -282,33 +298,33 @@ namespace eCAL
}

// eCAL initialization state
unsigned int comp_state(g_globals()->GetComponents());
const unsigned int comp_state(g_globals()->GetComponents());
process_sample_mutable_process->set_component_init_state(google::protobuf::int32(comp_state));
std::string component_info;
if (comp_state & Init::Publisher) component_info += "|pub";
if (comp_state & Init::Subscriber) component_info += "|sub";
if (comp_state & Init::Service) component_info += "|srv";
if (comp_state & Init::Monitoring) component_info += "|mon";
if (comp_state & Init::Logging) component_info += "|log";
if (comp_state & Init::TimeSync) component_info += "|time";
if ((comp_state & Init::Publisher) != 0u) component_info += "|pub";
if ((comp_state & Init::Subscriber) != 0u) component_info += "|sub";
if ((comp_state & Init::Service) != 0u) component_info += "|srv";
if ((comp_state & Init::Monitoring) != 0u) component_info += "|mon";
if ((comp_state & Init::Logging) != 0u) component_info += "|log";
if ((comp_state & Init::TimeSync) != 0u) component_info += "|time";
if (!component_info.empty()) component_info = component_info.substr(1);
process_sample_mutable_process->set_component_init_info(component_info);

process_sample_mutable_process->set_ecal_runtime_version(eCAL::GetVersionString());

// register sample
bool return_value = RegisterSample(Process::GetHostName(), process_sample);
const bool return_value = ApplySample(Process::GetHostName(), process_sample);

return return_value;
}

bool CRegistrationProvider::RegisterServer()
{
if(!m_created) return(0);
if(!m_reg_services) return(0);
if(!m_created) return(false);
if(!m_reg_services) return(false);

bool return_value {true};
std::lock_guard<std::mutex> lock(m_server_map_sync);
const std::lock_guard<std::mutex> lock(m_server_map_sync);
for(SampleMapT::const_iterator iter = m_server_map.begin(); iter != m_server_map.end(); ++iter)
{
//////////////////////////////////////////////
Expand All @@ -323,35 +339,35 @@ namespace eCAL
//////////////////////////////////////////////
// send sample to registration layer
//////////////////////////////////////////////
return_value &= RegisterSample(iter->second.service().sname(), iter->second);
return_value &= ApplySample(iter->second.service().sname(), iter->second);
}

return return_value;
}

bool CRegistrationProvider::RegisterClient()
{
if (!m_created) return(0);
if (!m_reg_services) return(0);
if (!m_created) return(false);
if (!m_reg_services) return(false);

bool return_value {true};
std::lock_guard<std::mutex> lock(m_client_map_sync);
const std::lock_guard<std::mutex> lock(m_client_map_sync);
for (SampleMapT::const_iterator iter = m_client_map.begin(); iter != m_client_map.end(); ++iter)
{
// register sample
return_value &= RegisterSample(iter->second.client().sname(), iter->second);
return_value &= ApplySample(iter->second.client().sname(), iter->second);
}

return return_value;
}

bool CRegistrationProvider::RegisterTopics()
{
if(!m_created) return(0);
if(!m_reg_topics) return(0);
if(!m_created) return(false);
if(!m_reg_topics) return(false);

bool return_value {true};
std::lock_guard<std::mutex> lock(m_topics_map_sync);
const std::lock_guard<std::mutex> lock(m_topics_map_sync);
for(SampleMapT::const_iterator iter = m_topics_map.begin(); iter != m_topics_map.end(); ++iter)
{
//////////////////////////////////////////////
Expand All @@ -367,15 +383,15 @@ namespace eCAL
//////////////////////////////////////////////
// send sample to registration layer
//////////////////////////////////////////////
return_value &= RegisterSample(iter->second.topic().tname(), iter->second);
return_value &= ApplySample(iter->second.topic().tname(), iter->second);
}

return return_value;
}

bool CRegistrationProvider::RegisterSample(const std::string& sample_name_, const eCAL::pb::Sample& sample_)
bool CRegistrationProvider::ApplySample(const std::string& sample_name_, const eCAL::pb::Sample& sample_)
{
if(!m_created) return(0);
if(!m_created) return(false);

bool return_value {true};

Expand All @@ -384,7 +400,7 @@ namespace eCAL

if(m_use_shm_monitoring)
{
std::lock_guard<std::mutex> lock(m_sample_list_sync);
const std::lock_guard<std::mutex> lock(m_sample_list_sync);
m_sample_list.mutable_samples()->Add()->CopyFrom(sample_);
}

Expand All @@ -399,13 +415,13 @@ namespace eCAL
if(m_use_shm_monitoring)
{
{
std::lock_guard<std::mutex> lock(m_sample_list_sync);
const std::lock_guard<std::mutex> lock(m_sample_list_sync);
m_sample_list.SerializeToString(&m_sample_list_buffer);
if(reset_sample_list_)
m_sample_list.clear_samples();
}

if(m_sample_list_buffer.size())
if(!m_sample_list_buffer.empty())
return_value &=m_memfile_broadcast_writer.Write(m_sample_list_buffer.data(), m_sample_list_buffer.size());
}

Expand All @@ -425,44 +441,41 @@ namespace eCAL
g_process_wbytes_sum = 0;

// refresh subscriber registration
if (g_subgate()) g_subgate()->RefreshRegistrations();
if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations();

// refresh publisher registration
if (g_pubgate()) g_pubgate()->RefreshRegistrations();
if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations();

// refresh server registration
if (g_servicegate()) g_servicegate()->RefreshRegistrations();
if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations();

// refresh client registration
if (g_clientgate()) g_clientgate()->RefreshRegistrations();

// overall registration send status for debugging
/*bool registration_successful {true};*/
if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations();

// register process
/*registration_successful &= */RegisterProcess();
RegisterProcess();

// register server
/*registration_successful &= */RegisterServer();
RegisterServer();

// register clients
/*registration_successful &= */RegisterClient();
RegisterClient();

// register topics
/*registration_successful &= */RegisterTopics();
RegisterTopics();

// write sample list to shared memory
/*registration_successful &= */SendSampleList();
SendSampleList();

return(0);
};
}

bool CRegistrationProvider::ApplyTopicToDescGate(const std::string& topic_name_
, const std::string& topic_type_
, const std::string& topic_desc_
, bool topic_is_a_publisher_)
{
if (g_descgate())
if (g_descgate() != nullptr)
{
// calculate the quality of the current info
::eCAL::CDescGate::QualityFlags quality = ::eCAL::CDescGate::QualityFlags::NO_QUALITY;
Expand All @@ -487,7 +500,7 @@ namespace eCAL
, const std::string& resp_type_name_
, const std::string& resp_type_desc_)
{
if (g_descgate())
if (g_descgate() != nullptr)
{
// Calculate the quality of the current info
::eCAL::CDescGate::QualityFlags quality = ::eCAL::CDescGate::QualityFlags::NO_QUALITY;
Expand All @@ -501,4 +514,4 @@ namespace eCAL
}
return false;
}
};
}
Loading