diff --git a/app/mon/mon_gui/src/widgets/models/topic_tree_item.cpp b/app/mon/mon_gui/src/widgets/models/topic_tree_item.cpp index eb2e68e00c..b504ceb491 100644 --- a/app/mon/mon_gui/src/widgets/models/topic_tree_item.cpp +++ b/app/mon/mon_gui/src/widgets/models/topic_tree_item.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -209,7 +209,7 @@ QVariant TopicTreeItem::data(Columns column, Qt::ItemDataRole role) const for (const auto& layer : layer_pb) { QString this_layer_string; - if (layer.confirmed()) + if (layer.active()) { switch (layer.type()) { diff --git a/app/mon/mon_tui/src/model/monitor.hpp b/app/mon/mon_tui/src/model/monitor.hpp index 63170a3ab6..1edea9818b 100644 --- a/app/mon/mon_tui/src/model/monitor.hpp +++ b/app/mon/mon_tui/src/model/monitor.hpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -204,7 +204,7 @@ class MonitorModel topic.type_descriptor = std::move(*t.mutable_tdatatype()->mutable_desc()); for(auto &tl: t.tlayer()) { - if (tl.confirmed()) + if (tl.active()) { topic.transport_layers.emplace_back(TopicTransportLayer(tl.type())); } diff --git a/ecal/core/include/ecal/config/publisher.h b/ecal/core/include/ecal/config/publisher.h index 63e45d1411..017955956b 100644 --- a/ecal/core/include/ecal/config/publisher.h +++ b/ecal/core/include/ecal/config/publisher.h @@ -89,9 +89,11 @@ #pragma once #include +#include #include #include +#include namespace eCAL { @@ -136,12 +138,16 @@ namespace eCAL { ECAL_API Configuration(); - SHM::Configuration shm; - UDP::Configuration udp; - TCP::Configuration tcp; + SHM::Configuration shm; + UDP::Configuration udp; + TCP::Configuration tcp; - bool share_topic_type; //!< share topic type via registration - bool share_topic_description; //!< share topic description via registration + using LayerPriorityVector = std::vector; + LayerPriorityVector layer_priority_local = { TLayer::tlayer_shm, TLayer::tlayer_udp_mc, TLayer::tlayer_tcp }; + LayerPriorityVector layer_priority_remote = { TLayer::tlayer_udp_mc, TLayer::tlayer_tcp }; + + bool share_topic_type; //!< share topic type via registration + bool share_topic_description; //!< share topic description via registration }; } } diff --git a/ecal/core/include/ecal/ecal_publisher.h b/ecal/core/include/ecal/ecal_publisher.h index c79f134b7a..7d5c9d4a42 100644 --- a/ecal/core/include/ecal/ecal_publisher.h +++ b/ecal/core/include/ecal/ecal_publisher.h @@ -24,16 +24,16 @@ #pragma once -#include #include #include #include #include -#include #include #include +#include #include +#include #include #include diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 71b78e92c4..5ba5610d46 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -103,7 +103,7 @@ namespace eCAL * @brief Constructor. * * @param topic_name_ Unique topic name. - * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param config_ Optional configuration parameters. **/ ECAL_API explicit CSubscriber(const std::string& topic_name_, const Subscriber::Configuration& config_ = {}); diff --git a/ecal/core/include/ecal/msg/subscriber.h b/ecal/core/include/ecal/msg/subscriber.h index b59d926daa..b293f6762e 100644 --- a/ecal/core/include/ecal/msg/subscriber.h +++ b/ecal/core/include/ecal/msg/subscriber.h @@ -254,12 +254,13 @@ namespace eCAL * @brief Constructor. * * @param topic_name_ Unique topic name. + * @param config_ Optional configuration parameters. **/ - CMessageSubscriber(const std::string& topic_name_) : CSubscriber() + CMessageSubscriber(const std::string& topic_name_, const Subscriber::Configuration& config_ = {}) : CSubscriber() , m_deserializer() { SDataTypeInformation topic_info = m_deserializer.GetDataTypeInformation(); - CSubscriber::Create(topic_name_, topic_info); + CSubscriber::Create(topic_name_, topic_info, config_); } ~CMessageSubscriber() noexcept @@ -418,7 +419,4 @@ namespace eCAL MsgReceiveCallbackT m_cb_callback; Deserializer m_deserializer; }; - - - } diff --git a/ecal/core/include/ecal/types/monitoring.h b/ecal/core/include/ecal/types/monitoring.h index f371719484..aff7793d15 100644 --- a/ecal/core/include/ecal/types/monitoring.h +++ b/ecal/core/include/ecal/types/monitoring.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,9 +64,9 @@ namespace eCAL struct TLayer { - eTLayerType type = tl_none; // #include diff --git a/ecal/core/src/ecal_process.cpp b/ecal/core/src/ecal_process.cpp index 20c26b45e5..d5e933a62f 100644 --- a/ecal/core/src/ecal_process.cpp +++ b/ecal/core/src/ecal_process.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp index 59595dff16..b70799de8a 100644 --- a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp +++ b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp @@ -207,9 +207,9 @@ namespace eCAL bool topic_tlayer_ecal_tcp(false); for (const auto& layer : sample_topic.tlayer) { - topic_tlayer_ecal_udp |= (layer.type == tl_ecal_udp) && layer.confirmed; - topic_tlayer_ecal_shm |= (layer.type == tl_ecal_shm) && layer.confirmed; - topic_tlayer_ecal_tcp |= (layer.type == tl_ecal_tcp) && layer.confirmed; + topic_tlayer_ecal_udp |= (layer.type == tl_ecal_udp) && layer.active; + topic_tlayer_ecal_shm |= (layer.type == tl_ecal_shm) && layer.active; + topic_tlayer_ecal_tcp |= (layer.type == tl_ecal_tcp) && layer.active; } const int32_t connections_loc = sample_topic.connections_loc; const int32_t connections_ext = sample_topic.connections_ext; @@ -305,22 +305,22 @@ namespace eCAL // tlayer udp_mc { eCAL::Monitoring::TLayer tlayer; - tlayer.type = eCAL::Monitoring::tl_ecal_udp_mc; - tlayer.confirmed = topic_tlayer_ecal_udp; + tlayer.type = eCAL::Monitoring::tl_ecal_udp_mc; + tlayer.active = topic_tlayer_ecal_udp; TopicInfo.tlayer.push_back(tlayer); } // tlayer shm { eCAL::Monitoring::TLayer tlayer; - tlayer.type = eCAL::Monitoring::tl_ecal_shm; - tlayer.confirmed = topic_tlayer_ecal_shm; + tlayer.type = eCAL::Monitoring::tl_ecal_shm; + tlayer.active = topic_tlayer_ecal_shm; TopicInfo.tlayer.push_back(tlayer); } // tlayer tcp { eCAL::Monitoring::TLayer tlayer; - tlayer.type = eCAL::Monitoring::tl_ecal_tcp; - tlayer.confirmed = topic_tlayer_ecal_tcp; + tlayer.type = eCAL::Monitoring::tl_ecal_tcp; + tlayer.active = topic_tlayer_ecal_tcp; TopicInfo.tlayer.push_back(tlayer); } diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index cd624626d6..0dc863b343 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -129,18 +129,18 @@ namespace eCAL CDataWriter::SLayerStates layer_states; for (const auto& layer : ecal_topic.tlayer) { - if (layer.confirmed) + if (layer.enabled) { switch (layer.type) { case TLayer::tlayer_udp_mc: - layer_states.udp = true; + layer_states.udp.read_enabled = true; break; case TLayer::tlayer_shm: - layer_states.shm = true; + layer_states.shm.read_enabled = true; break; case TLayer::tlayer_tcp: - layer_states.tcp = true; + layer_states.tcp.read_enabled = true; break; default: break; diff --git a/ecal/core/src/pubsub/ecal_publisher.cpp b/ecal/core/src/pubsub/ecal_publisher.cpp index ef04c32cf5..5fba62ffd7 100644 --- a/ecal/core/src/pubsub/ecal_publisher.cpp +++ b/ecal/core/src/pubsub/ecal_publisher.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index a303dd0848..0690ca7ee3 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -166,7 +166,7 @@ namespace eCAL const auto& ecal_sample_content = ecal_sample.content; for (const auto& reader : readers_to_apply) { - applied_size = reader->AddSample( + applied_size = reader->ApplySample( ecal_sample.topic.tid, payload_addr, payload_size, @@ -207,7 +207,7 @@ namespace eCAL for (const auto& reader : readers_to_apply) { - applied_size = reader->AddSample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_); + applied_size = reader->ApplySample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_); } return (applied_size > 0); @@ -232,18 +232,18 @@ namespace eCAL CDataReader::SLayerStates layer_states; for (const auto& layer : ecal_topic.tlayer) { - if (layer.confirmed) + if (layer.enabled) { switch (layer.type) { case TLayer::tlayer_udp_mc: - layer_states.udp = true; + layer_states.udp.write_enabled = true; break; case TLayer::tlayer_shm: - layer_states.shm = true; + layer_states.shm.write_enabled = true; break; case TLayer::tlayer_tcp: - layer_states.tcp = true; + layer_states.tcp.write_enabled = true; break; default: break; diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index c5aaeea32f..4e4366050b 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -81,7 +81,7 @@ namespace eCAL if (topic_name_.empty()) return(false); // create datareader - m_datareader = std::make_shared(topic_name_, data_type_info_); + m_datareader = std::make_shared(topic_name_, data_type_info_, config_); // register datareader g_subgate()->Register(topic_name_, m_datareader); @@ -143,7 +143,7 @@ namespace eCAL bool CSubscriber::ReceiveBuffer(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ /* = 0 */) const { if (!m_created) return(false); - return(m_datareader->Receive(buf_, time_, rcv_timeout_)); + return(m_datareader->Read(buf_, time_, rcv_timeout_)); } bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_) diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 160071e563..39701bd798 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -21,17 +21,14 @@ * @brief common eCAL data reader **/ -#include -#include #include -#include -#include -#include -#include +#include +#include #if ECAL_CORE_REGISTRATION #include "registration/ecal_registration_provider.h" #endif + #include "ecal_reader.h" #include "ecal_global_accessors.h" #include "ecal_reader_layer.h" @@ -49,9 +46,14 @@ #endif #include +#include +#include #include +#include +#include +#include #include -#include +#include #include namespace eCAL @@ -59,7 +61,7 @@ namespace eCAL //////////////////////////////////////// // CDataReader //////////////////////////////////////// - CDataReader::CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_) : + CDataReader::CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const Subscriber::Configuration& config_) : m_host_name(Process::GetHostName()), m_host_group_name(Process::GetHostGroupName()), m_pid(Process::GetProcessID()), @@ -67,6 +69,7 @@ namespace eCAL m_topic_name(topic_name_), m_topic_info(topic_info_), m_topic_size(0), + m_config(config_), m_connected(false), m_receive_time(0), m_clock(0), @@ -90,7 +93,7 @@ namespace eCAL m_pub_map.set_expiration(registration_timeout); // start transport layers - SubscribeToLayers(); + StartTransportLayer(); // mark as created m_created = true; @@ -118,7 +121,7 @@ namespace eCAL #endif // stop transport layers - UnsubscribeFromLayers(); + StopTransportLayer(); // reset receive callback { @@ -141,173 +144,112 @@ namespace eCAL return true; } - void CDataReader::InitializeLayers() + bool CDataReader::Read(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ms_ /* = 0 */) { - // initialize udp multicast layer -#if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) - { - CUDPReaderLayer::Get()->Initialize(); - } -#endif + if (!m_created) return(false); - // initialize tcp layer -#if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) - { - CTCPReaderLayer::Get()->Initialize(); - } -#endif - } + std::unique_lock read_buffer_lock(m_read_buf_mtx); - void CDataReader::SubscribeToLayers() - { - // subscribe topic to udp multicast layer -#if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) + // No need to wait (for whatever time) if something has been received + if (!m_read_buf_received) { - CUDPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); + if (rcv_timeout_ms_ < 0) + { + m_read_buf_cv.wait(read_buffer_lock, [this]() { return this->m_read_buf_received; }); + } + else if (rcv_timeout_ms_ > 0) + { + m_read_buf_cv.wait_for(read_buffer_lock, std::chrono::milliseconds(rcv_timeout_ms_), [this]() { return this->m_read_buf_received; }); + } } -#endif - // subscribe topic to tcp layer -#if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) - { - CTCPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); - } -#endif - } - - void CDataReader::UnsubscribeFromLayers() - { - // unsubscribe topic from udp multicast layer -#if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) + // did we receive new samples ? + if (m_read_buf_received) { - CUDPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); - } +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug3, m_topic_name + "::CDataReader::Receive"); #endif + // copy content to target string + buf_.clear(); + buf_.swap(m_read_buf); + m_read_buf_received = false; - // unsubscribe topic from tcp multicast layer -#if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) - { - CTCPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); + // apply time + if (time_ != nullptr) *time_ = m_read_time; + + // return success + return(true); } -#endif + + return(false); } - bool CDataReader::Register(const bool force_) + bool CDataReader::AddReceiveCallback(ReceiveCallbackT callback_) { -#if ECAL_CORE_REGISTRATION - if (!m_created) return(false); - if(m_topic_name.empty()) return(false); + if (!m_created) return(false); - // create command parameter - Registration::Sample ecal_reg_sample; - ecal_reg_sample.cmd_type = bct_reg_subscriber; - auto& ecal_reg_sample_topic = ecal_reg_sample.topic; - ecal_reg_sample_topic.hname = m_host_name; - ecal_reg_sample_topic.hgname = m_host_group_name; - ecal_reg_sample_topic.tname = m_topic_name; - ecal_reg_sample_topic.tid = m_topic_id; - // topic_information + // store receive callback { - auto& ecal_reg_sample_tdatatype = ecal_reg_sample_topic.tdatatype; - if (m_share_ttype) - { - ecal_reg_sample_tdatatype.encoding = m_topic_info.encoding; - ecal_reg_sample_tdatatype.name = m_topic_info.name; - } - if (m_share_tdesc) - { - ecal_reg_sample_tdatatype.descriptor = m_topic_info.descriptor; - } + const std::lock_guard lock(m_receive_callback_mtx); +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddReceiveCallback"); +#endif + m_receive_callback = std::move(callback_); } - ecal_reg_sample_topic.attr = m_attr; - ecal_reg_sample_topic.tsize = static_cast(m_topic_size); -#if ECAL_CORE_TRANSPORT_UDP - // udp multicast layer - { - Registration::TLayer udp_tlayer; - udp_tlayer.type = tl_ecal_udp; - udp_tlayer.version = 1; - udp_tlayer.confirmed = m_confirmed_layers.udp; - ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); - } -#endif + return(true); + } -#if ECAL_CORE_TRANSPORT_SHM - // shm layer - { - Registration::TLayer shm_tlayer; - shm_tlayer.type = tl_ecal_shm; - shm_tlayer.version = 1; - shm_tlayer.confirmed = m_confirmed_layers.shm; - ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); - } -#endif + bool CDataReader::RemReceiveCallback() + { + if (!m_created) return(false); -#if ECAL_CORE_TRANSPORT_TCP - // tcp layer + // reset receive callback { - Registration::TLayer tcp_tlayer; - tcp_tlayer.type = tl_ecal_tcp; - tcp_tlayer.version = 1; - tcp_tlayer.confirmed = m_confirmed_layers.tcp; - ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); - } + const std::lock_guard lock(m_receive_callback_mtx); +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemReceiveCallback"); #endif + m_receive_callback = nullptr; + } - ecal_reg_sample_topic.pid = m_pid; - ecal_reg_sample_topic.pname = m_pname; - ecal_reg_sample_topic.uname = Process::GetUnitName(); - ecal_reg_sample_topic.dclock = m_clock; - ecal_reg_sample_topic.dfreq = GetFrequency(); - ecal_reg_sample_topic.message_drops = static_cast(m_message_drops); + return(true); + } - // we do not know the number of connections .. - ecal_reg_sample_topic.connections_loc = 0; - ecal_reg_sample_topic.connections_ext = 0; + bool CDataReader::AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_) + { + if (!m_created) return(false); - // register subscriber - if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); + // store event callback + { #ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddEventCallback"); #endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = std::move(callback_); + } -#endif // ECAL_CORE_REGISTRATION return(true); } - bool CDataReader::Unregister() + bool CDataReader::RemEventCallback(eCAL_Subscriber_Event type_) { -#if ECAL_CORE_REGISTRATION - if (m_topic_name.empty()) return(false); - - // create command parameter - Registration::Sample ecal_unreg_sample; - ecal_unreg_sample.cmd_type = bct_unreg_subscriber; - auto& ecal_reg_sample_topic = ecal_unreg_sample.topic; - ecal_reg_sample_topic.hname = m_host_name; - ecal_reg_sample_topic.hgname = m_host_group_name; - ecal_reg_sample_topic.pname = m_pname; - ecal_reg_sample_topic.pid = m_pid; - ecal_reg_sample_topic.tname = m_topic_name; - ecal_reg_sample_topic.tid = m_topic_id; - ecal_reg_sample_topic.uname = Process::GetUnitName(); + if (!m_created) return(false); - // unregister subscriber - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); + // reset event callback + { #ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemEventCallback"); #endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = nullptr; + } -#endif // ECAL_CORE_REGISTRATION return(true); } @@ -346,57 +288,143 @@ namespace eCAL return(true); } - bool CDataReader::Receive(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ms_ /* = 0 */) + void CDataReader::SetID(const std::set& id_set_) { - if (!m_created) return(false); + m_id_set = id_set_; + } - std::unique_lock read_buffer_lock(m_read_buf_mtx); + void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_) + { + // flag write enabled from publisher side (information not used yet) +#if ECAL_CORE_TRANSPORT_UDP + m_layers.udp.write_enabled = layer_states_.udp.write_enabled; +#endif +#if ECAL_CORE_TRANSPORT_SHM + m_layers.shm.write_enabled = layer_states_.shm.write_enabled; +#endif +#if ECAL_CORE_TRANSPORT_TCP + m_layers.tcp.write_enabled = layer_states_.tcp.write_enabled; +#endif - // No need to wait (for whatever time) if something has been received - if (!m_read_buf_received) + FireConnectEvent(publication_info_.topic_id, data_type_info_); + + // add key to publisher map { - if (rcv_timeout_ms_ < 0) - { - m_read_buf_cv.wait(read_buffer_lock, [this]() { return this->m_read_buf_received; }); - } - else if (rcv_timeout_ms_ > 0) + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_); + } + } + + void CDataReader::RemovePublication(const SPublicationInfo& publication_info_) + { + // remove key from publisher map + { + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map.erase(publication_info_); + } + } + + void CDataReader::ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_) + { + SReaderLayerPar par; + par.host_name = publication_info_.host_name; + par.process_id = publication_info_.process_id; + par.topic_name = m_topic_name; + par.topic_id = publication_info_.topic_id; + par.parameter = parameter_; + + switch (type_) + { + case tl_ecal_shm: +#if ECAL_CORE_TRANSPORT_SHM + CSHMReaderLayer::Get()->SetConnectionParameter(par); +#endif + break; + case tl_ecal_tcp: +#if ECAL_CORE_TRANSPORT_TCP + CTCPReaderLayer::Get()->SetConnectionParameter(par); +#endif + break; + default: + break; + } + } + + void CDataReader::RefreshRegistration() + { + if (!m_created) return; + + // ensure that registration is not called within zero nanoseconds + // normally it will be called from registration logic every second + + // register without send + Register(false); + + // check connection timeouts + { + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map.erase_expired(); + + if (m_pub_map.empty()) { - m_read_buf_cv.wait_for(read_buffer_lock, std::chrono::milliseconds(rcv_timeout_ms_), [this]() { return this->m_read_buf_received; }); + FireDisconnectEvent(); } } + } - // did we receive new samples ? - if (m_read_buf_received) + void CDataReader::InitializeLayers() + { + // initialize udp layer +#if ECAL_CORE_TRANSPORT_UDP + if (Config::IsUdpMulticastRecEnabled()) { -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug3, m_topic_name + "::CDataReader::Receive"); + CUDPReaderLayer::Get()->Initialize(); + } #endif - // copy content to target string - buf_.clear(); - buf_.swap(m_read_buf); - m_read_buf_received = false; - // apply time - if(time_ != nullptr) *time_ = m_read_time; - - // return success - return(true); + // initialize shm layer +#if ECAL_CORE_TRANSPORT_SHM + if (Config::IsShmRecEnabled()) + { + CSHMReaderLayer::Get()->Initialize(); } +#endif - return(false); + // initialize tcp layer +#if ECAL_CORE_TRANSPORT_TCP + if (Config::IsTcpRecEnabled()) + { + CTCPReaderLayer::Get()->Initialize(); + } +#endif } - size_t CDataReader::AddSample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) + size_t CDataReader::ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) { // ensure thread safety const std::lock_guard lock(m_receive_callback_mtx); if (!m_created) return(0); + // check receive layer configuration + switch (layer_) + { + case tl_ecal_udp: + if (!m_config.udp.enable) return 0; + break; + case tl_ecal_shm: + if (!m_config.shm.enable) return 0; + break; + case tl_ecal_tcp: + if (!m_config.tcp.enable) return 0; + break; + default: + break; + } + // store receive layer - m_confirmed_layers.udp |= layer_ == tl_ecal_udp; - m_confirmed_layers.shm |= layer_ == tl_ecal_shm; - m_confirmed_layers.tcp |= layer_ == tl_ecal_tcp; + m_layers.udp.active |= layer_ == tl_ecal_udp; + m_layers.shm.active |= layer_ == tl_ecal_shm; + m_layers.tcp.active |= layer_ == tl_ecal_tcp; // number of hash values to track for duplicates constexpr int hash_queue_size(64); @@ -480,7 +508,7 @@ namespace eCAL } // if not consumed by user receive call - if(!processed) + if (!processed) { // push sample into read buffer const std::lock_guard read_buffer_lock(m_read_buf_mtx); @@ -500,126 +528,226 @@ namespace eCAL return(size_); } - bool CDataReader::AddReceiveCallback(ReceiveCallbackT callback_) + std::string CDataReader::Dump(const std::string& indent_ /* = "" */) { - if (!m_created) return(false); + std::stringstream out; - // store receive callback - { - const std::lock_guard lock(m_receive_callback_mtx); -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddReceiveCallback"); -#endif - m_receive_callback = std::move(callback_); - } + out << '\n'; + out << indent_ << "------------------------------------" << '\n'; + out << indent_ << " class CDataReader " << '\n'; + out << indent_ << "------------------------------------" << '\n'; + out << indent_ << "m_host_name: " << m_host_name << '\n'; + out << indent_ << "m_host_group_name: " << m_host_group_name << '\n'; + out << indent_ << "m_topic_name: " << m_topic_name << '\n'; + out << indent_ << "m_topic_id: " << m_topic_id << '\n'; + out << indent_ << "m_topic_info.encoding: " << m_topic_info.encoding << '\n'; + out << indent_ << "m_topic_info.name: " << m_topic_info.name << '\n'; + out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n'; + out << indent_ << "m_topic_size: " << m_topic_size << '\n'; + out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << '\n'; + out << indent_ << "m_read_time: " << m_read_time << '\n'; + out << indent_ << "m_clock: " << m_clock << '\n'; + out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; + out << indent_ << "m_created: " << m_created << '\n'; + out << '\n'; - return(true); + return(out.str()); } - bool CDataReader::RemReceiveCallback() + bool CDataReader::Register(const bool force_) { - if (!m_created) return(false); +#if ECAL_CORE_REGISTRATION + if (!m_created) return(false); + if(m_topic_name.empty()) return(false); - // reset receive callback + // create command parameter + Registration::Sample ecal_reg_sample; + ecal_reg_sample.cmd_type = bct_reg_subscriber; + auto& ecal_reg_sample_topic = ecal_reg_sample.topic; + ecal_reg_sample_topic.hname = m_host_name; + ecal_reg_sample_topic.hgname = m_host_group_name; + ecal_reg_sample_topic.tname = m_topic_name; + ecal_reg_sample_topic.tid = m_topic_id; + // topic_information { - const std::lock_guard lock(m_receive_callback_mtx); -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemReceiveCallback"); -#endif - m_receive_callback = nullptr; + auto& ecal_reg_sample_tdatatype = ecal_reg_sample_topic.tdatatype; + if (m_share_ttype) + { + ecal_reg_sample_tdatatype.encoding = m_topic_info.encoding; + ecal_reg_sample_tdatatype.name = m_topic_info.name; + } + if (m_share_tdesc) + { + ecal_reg_sample_tdatatype.descriptor = m_topic_info.descriptor; + } } + ecal_reg_sample_topic.attr = m_attr; + ecal_reg_sample_topic.tsize = static_cast(m_topic_size); - return(true); - } +#if ECAL_CORE_TRANSPORT_UDP + // udp multicast layer + { + Registration::TLayer udp_tlayer; + udp_tlayer.type = tl_ecal_udp; + udp_tlayer.version = 1; + udp_tlayer.enabled = m_layers.udp.read_enabled; + udp_tlayer.active = m_layers.udp.active; + ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); + } +#endif - bool CDataReader::AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_) - { - if (!m_created) return(false); +#if ECAL_CORE_TRANSPORT_SHM + // shm layer + { + Registration::TLayer shm_tlayer; + shm_tlayer.type = tl_ecal_shm; + shm_tlayer.version = 1; + shm_tlayer.enabled = m_layers.shm.read_enabled; + shm_tlayer.active = m_layers.shm.active; + ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); + } +#endif - // store event callback +#if ECAL_CORE_TRANSPORT_TCP + // tcp layer { + Registration::TLayer tcp_tlayer; + tcp_tlayer.type = tl_ecal_tcp; + tcp_tlayer.version = 1; + tcp_tlayer.enabled = m_layers.tcp.read_enabled; + tcp_tlayer.active = m_layers.tcp.active; + ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); + } +#endif + + ecal_reg_sample_topic.pid = m_pid; + ecal_reg_sample_topic.pname = m_pname; + ecal_reg_sample_topic.uname = Process::GetUnitName(); + ecal_reg_sample_topic.dclock = m_clock; + ecal_reg_sample_topic.dfreq = GetFrequency(); + ecal_reg_sample_topic.message_drops = static_cast(m_message_drops); + + // we do not know the number of connections .. + ecal_reg_sample_topic.connections_loc = 0; + ecal_reg_sample_topic.connections_ext = 0; + + // register subscriber + if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); #ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddEventCallback"); + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = std::move(callback_); - } return(true); +#else // ECAL_CORE_REGISTRATION + (void)force_; + return(false); +#endif // ECAL_CORE_REGISTRATION } - bool CDataReader::RemEventCallback(eCAL_Subscriber_Event type_) + bool CDataReader::Unregister() { - if (!m_created) return(false); +#if ECAL_CORE_REGISTRATION + if (m_topic_name.empty()) return(false); - // reset event callback - { + // create command parameter + Registration::Sample ecal_unreg_sample; + ecal_unreg_sample.cmd_type = bct_unreg_subscriber; + auto& ecal_reg_sample_topic = ecal_unreg_sample.topic; + ecal_reg_sample_topic.hname = m_host_name; + ecal_reg_sample_topic.hgname = m_host_group_name; + ecal_reg_sample_topic.pname = m_pname; + ecal_reg_sample_topic.pid = m_pid; + ecal_reg_sample_topic.tname = m_topic_name; + ecal_reg_sample_topic.tid = m_topic_id; + ecal_reg_sample_topic.uname = Process::GetUnitName(); + + // unregister subscriber + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); #ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemEventCallback"); + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = nullptr; - } return(true); +#else // ECAL_CORE_REGISTRATION + return(false); +#endif // ECAL_CORE_REGISTRATION } - void CDataReader::SetID(const std::set& id_set_) + void CDataReader::StartTransportLayer() { - m_id_set = id_set_; - } +#if ECAL_CORE_TRANSPORT_UDP + if (m_config.udp.enable) + { + // flag enabled + m_layers.udp.read_enabled = true; - void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_) - { - Connect(publication_info_.topic_id, data_type_info_); + // subscribe to layer (if supported) + CUDPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); + } +#endif - // add key to publisher map +#if ECAL_CORE_TRANSPORT_SHM + if (m_config.shm.enable) { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_); + // flag enabled + m_layers.shm.read_enabled = true; + + // subscribe to layer (if supported) + CSHMReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); } - } +#endif - void CDataReader::RemovePublication(const SPublicationInfo& publication_info_) - { - // remove key from publisher map +#if ECAL_CORE_TRANSPORT_TCP + if (m_config.tcp.enable) { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map.erase(publication_info_); + // flag enabled + m_layers.tcp.read_enabled = true; + + // subscribe to layer (if supported) + CTCPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); } +#endif } - - void CDataReader::ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_) + + void CDataReader::StopTransportLayer() { - SReaderLayerPar par; - par.host_name = publication_info_.host_name; - par.process_id = publication_info_.process_id; - par.topic_name = m_topic_name; - par.topic_id = publication_info_.topic_id; - par.parameter = parameter_; - - switch (type_) +#if ECAL_CORE_TRANSPORT_UDP + if (m_config.udp.enable) { - case tl_ecal_shm: + // flag disabled + m_layers.udp.read_enabled = false; + + // unsubscribe from layer (if supported) + CUDPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); + } +#endif + #if ECAL_CORE_TRANSPORT_SHM - CSHMReaderLayer::Get()->SetConnectionParameter(par); + if (m_config.shm.enable) + { + // flag disabled + m_layers.shm.read_enabled = false; + + // unsubscribe from layer (if supported) + CSHMReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); + } #endif - break; - case tl_ecal_tcp: + #if ECAL_CORE_TRANSPORT_TCP - CTCPReaderLayer::Get()->SetConnectionParameter(par); -#endif - break; - default: - break; + if (m_config.tcp.enable) + { + // flag disabled + m_layers.tcp.read_enabled = false; + + // unsubscribe from layer (if supported) + CTCPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); } +#endif } - void CDataReader::Connect(const std::string& tid_, const SDataTypeInformation& tinfo_) + void CDataReader::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) { SSubEventCallbackData data; data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); @@ -657,7 +785,7 @@ namespace eCAL } } - void CDataReader::Disconnect() + void CDataReader::FireDisconnectEvent() { if (m_connected) { @@ -804,53 +932,4 @@ namespace eCAL const std::lock_guard lock(m_frequency_calculator_mtx); return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); } - - void CDataReader::RefreshRegistration() - { - if(!m_created) return; - - // ensure that registration is not called within zero nanoseconds - // normally it will be called from registration logic every second - - // register without send - Register(false); - - // check connection timeouts - { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map.erase_expired(); - - if (m_pub_map.empty()) - { - Disconnect(); - } - } - } - - std::string CDataReader::Dump(const std::string& indent_ /* = "" */) - { - std::stringstream out; - - - out << '\n'; - out << indent_ << "------------------------------------" << '\n'; - out << indent_ << " class CDataReader " << '\n'; - out << indent_ << "------------------------------------" << '\n'; - out << indent_ << "m_host_name: " << m_host_name << '\n'; - out << indent_ << "m_host_group_name: " << m_host_group_name << '\n'; - out << indent_ << "m_topic_name: " << m_topic_name << '\n'; - out << indent_ << "m_topic_id: " << m_topic_id << '\n'; - out << indent_ << "m_topic_info.encoding: " << m_topic_info.encoding << '\n'; - out << indent_ << "m_topic_info.name: " << m_topic_info.name << '\n'; - out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n'; - out << indent_ << "m_topic_size: " << m_topic_size << '\n'; - out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << '\n'; - out << indent_ << "m_read_time: " << m_read_time << '\n'; - out << indent_ << "m_clock: " << m_clock << '\n'; - out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; - out << indent_ << "m_created: " << m_created << '\n'; - out << '\n'; - - return(out.str()); - } } diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index fbe0213e22..13f2a50291 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -23,9 +23,9 @@ #pragma once -#include #include #include +#include #include "serialization/ecal_serialize_sample_payload.h" #include "serialization/ecal_serialize_sample_registration.h" @@ -39,10 +39,10 @@ #include #include #include +#include #include #include #include -#include #include namespace eCAL @@ -50,11 +50,18 @@ namespace eCAL class CDataReader { public: + struct SLayerState + { + bool write_enabled = false; // is publisher enabled to write data on this layer? + bool read_enabled = false; // is this subscriber configured to read data from this layer? + bool active = false; // data has been received on this layer + }; + struct SLayerStates { - bool udp = false; - bool shm = false; - bool tcp = false; + SLayerState udp; + SLayerState shm; + SLayerState tcp; }; struct SPublicationInfo @@ -70,14 +77,12 @@ namespace eCAL } }; - CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_); + CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const Subscriber::Configuration& config_ = {}); ~CDataReader(); bool Stop(); - static void InitializeLayers(); - - bool Receive(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0); + bool Read(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0); bool AddReceiveCallback(ReceiveCallbackT callback_); bool RemReceiveCallback(); @@ -95,7 +100,7 @@ namespace eCAL void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_); - std::string Dump(const std::string& indent_ = ""); + void RefreshRegistration(); bool IsCreated() const { return(m_created); } @@ -115,19 +120,21 @@ namespace eCAL std::string GetTopicID() const { return(m_topic_id); } SDataTypeInformation GetDataTypeInformation() const { return(m_topic_info); } - void RefreshRegistration(); + static void InitializeLayers(); + size_t ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); - size_t AddSample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); + std::string Dump(const std::string& indent_ = ""); protected: - void SubscribeToLayers(); - void UnsubscribeFromLayers(); - bool Register(bool force_); bool Unregister(); - void Connect(const std::string& tid_, const SDataTypeInformation& tinfo_); - void Disconnect(); + void StartTransportLayer(); + void StopTransportLayer(); + + void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); + void FireDisconnectEvent(); + bool CheckMessageClock(const std::string& tid_, long long current_clock_); int32_t GetFrequency(); @@ -141,6 +148,7 @@ namespace eCAL SDataTypeInformation m_topic_info; std::map m_attr; std::atomic m_topic_size; + Subscriber::Configuration m_config; std::atomic m_connected; using PublicationMapT = Util::CExpirationMap>; @@ -177,7 +185,7 @@ namespace eCAL bool m_share_ttype = false; bool m_share_tdesc = false; - SLayerStates m_confirmed_layers; + SLayerStates m_layers; std::atomic m_created; }; } diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index bc754e8faf..61eec8bd58 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -21,13 +21,10 @@ * @brief common eCAL data writer **/ -#include #include +#include #include -#include -#include -#include -#include +#include #include "config/ecal_config_reader_hlp.h" @@ -41,8 +38,12 @@ #include "pubsub/ecal_pubgate.h" -#include #include +#include +#include +#include +#include +#include struct SSndHash { @@ -65,6 +66,32 @@ namespace std }; } +namespace +{ +#ifndef NDEBUG + // function to convert boolean to string + std::string boolToString(bool value) + { + return value ? "true" : "false"; + } + + // function to log the states of SLayerState + void logLayerState(const std::string& layerName, const eCAL::CDataWriter::SLayerState& state) { + std::cout << layerName << " - Read Enabled: " << boolToString(state.read_enabled) + << ", Write Enabled: " << boolToString(state.write_enabled) + << ", Write Active : " << boolToString(state.active) << std::endl; + } + + // function to log the states of SLayerStates + void logLayerStates(const eCAL::CDataWriter::SLayerStates& states) { + std::cout << "Logging Layer States:" << std::endl; + logLayerState("UDP", states.udp); + logLayerState("SHM", states.shm); + logLayerState("TCP", states.tcp); + } +#endif +} + namespace eCAL { CDataWriter::CDataWriter(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const Publisher::Configuration& config_) : @@ -98,9 +125,6 @@ namespace eCAL // register Register(false); - - // start udp, shm, tcp layer - StartTransportLayer(); } CDataWriter::~CDataWriter() @@ -121,8 +145,8 @@ namespace eCAL Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Stop"); #endif - // stop udp, shm, tcp layer - StopTransportLayer(); + // stop all transport layer + StopAllLayer(); // clear subscriber maps { @@ -145,92 +169,6 @@ namespace eCAL return true; } - bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_) - { - // Does it even make sense to register if the info is the same??? - const bool force = m_topic_info != topic_info_; - m_topic_info = topic_info_; - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetDescription"); -#endif - - // register it - Register(force); - - return(true); - } - - bool CDataWriter::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) - { - auto current_val = m_attr.find(attr_name_); - - const bool force = current_val == m_attr.end() || current_val->second != attr_value_; - m_attr[attr_name_] = attr_value_; - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetAttribute"); -#endif - - // register it - Register(force); - - return(true); - } - - bool CDataWriter::ClearAttribute(const std::string& attr_name_) - { - auto force = m_attr.find(attr_name_) != m_attr.end(); - - m_attr.erase(attr_name_); - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ClearAttribute"); -#endif - - // register it - Register(force); - - return(true); - } - - bool CDataWriter::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) - { - if (!m_created) return(false); - - // store event callback - { -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::AddEventCallback"); -#endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = std::move(callback_); - } - - return(true); - } - - bool CDataWriter::RemEventCallback(eCAL_Publisher_Event type_) - { - if (!m_created) return(false); - - // reset event callback - { -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::RemEventCallback"); -#endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = nullptr; - } - - return(true); - } - size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long id_) { // get payload buffer size (one time, to avoid multiple computations) @@ -310,7 +248,7 @@ namespace eCAL shm_sent = m_writer_shm->Write(payload_buf, wattr); } - m_confirmed_layers.shm = true; + m_layers.shm.active = true; } written |= shm_sent; @@ -361,7 +299,7 @@ namespace eCAL // write to udp multicast layer udp_sent = m_writer_udp->Write(m_payload_buffer.data(), wattr); - m_confirmed_layers.udp = true; + m_layers.udp.active = true; } written |= udp_sent; @@ -403,7 +341,7 @@ namespace eCAL // write to tcp layer tcp_sent = m_writer_tcp->Write(m_payload_buffer.data(), wattr); - m_confirmed_layers.tcp = true; + m_layers.tcp.active = true; } written |= tcp_sent; @@ -426,14 +364,146 @@ namespace eCAL else return 0; } - void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_, const std::string& reader_par_) + bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_) + { + // Does it even make sense to register if the info is the same??? + const bool force = m_topic_info != topic_info_; + m_topic_info = topic_info_; + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetDescription"); +#endif + + // register it + Register(force); + + return(true); + } + + bool CDataWriter::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) { - Connect(subscription_info_.topic_id, data_type_info_); + auto current_val = m_attr.find(attr_name_); + + const bool force = current_val == m_attr.end() || current_val->second != attr_value_; + m_attr[attr_name_] = attr_value_; + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetAttribute"); +#endif + + // register it + Register(force); + + return(true); + } + + bool CDataWriter::ClearAttribute(const std::string& attr_name_) + { + auto force = m_attr.find(attr_name_) != m_attr.end(); + + m_attr.erase(attr_name_); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ClearAttribute"); +#endif + + // register it + Register(force); + + return(true); + } + + bool CDataWriter::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) + { + if (!m_created) return(false); + + // store event callback + { +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::AddEventCallback"); +#endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = std::move(callback_); + } + + return(true); + } + + bool CDataWriter::RemEventCallback(eCAL_Publisher_Event type_) + { + if (!m_created) return(false); + + // reset event callback + { +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::RemEventCallback"); +#endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = nullptr; + } + + return(true); + } + + void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_) + { + FireConnectEvent(subscription_info_.topic_id, data_type_info_); + + // collect layer states + std::vector pub_layers; + std::vector sub_layers; +#if ECAL_CORE_TRANSPORT_UDP + if (m_config.udp.enable) pub_layers.push_back(tl_ecal_udp); + if (sub_layer_states_.udp.read_enabled) sub_layers.push_back(tl_ecal_udp); + + m_layers.udp.read_enabled = sub_layer_states_.udp.read_enabled; // just for debugging/logging +#endif +#if ECAL_CORE_TRANSPORT_SHM + if (m_config.shm.enable) pub_layers.push_back(tl_ecal_shm); + if (sub_layer_states_.shm.read_enabled) sub_layers.push_back(tl_ecal_shm); + + m_layers.shm.read_enabled = sub_layer_states_.shm.read_enabled; // just for debugging/logging +#endif +#if ECAL_CORE_TRANSPORT_TCP + if (m_config.tcp.enable) pub_layers.push_back(tl_ecal_tcp); + if (sub_layer_states_.tcp.read_enabled) sub_layers.push_back(tl_ecal_tcp); + + m_layers.tcp.read_enabled = sub_layer_states_.tcp.read_enabled; // just for debugging/logging +#endif + + // determine if we need to start a transport layer + // if a new layer gets activated, we reregister for SHM and TCP to force the exchange of connection parameter + // without this forced registration we would need one additional registration loop for these two layers to establish the connection + const TLayer::eTransportLayer layer2activate = DetermineTransportLayer2Start(pub_layers, sub_layers, m_host_name == subscription_info_.host_name); + switch (layer2activate) + { + case tl_ecal_udp: + StartUdpLayer(); + break; + case tl_ecal_shm: + if (StartShmLayer()) Register(true); + break; + case tl_ecal_tcp: + if (StartTcpLayer()) Register(true); + break; + default: + break; + } + +#ifndef NDEBUG + // log it + //logLayerStates(m_layers); +#endif // add key to subscriber map { const std::lock_guard lock(m_sub_map_mtx); - m_sub_map[subscription_info_] = std::make_tuple(data_type_info_, layer_states_); + m_sub_map[subscription_info_] = std::make_tuple(data_type_info_, sub_layer_states_); } // add a new subscription @@ -492,7 +562,7 @@ namespace eCAL if (m_sub_map.empty()) { - Disconnect(); + FireDisconnectEvent(); } } } @@ -574,7 +644,8 @@ namespace eCAL eCAL::Registration::TLayer udp_tlayer; udp_tlayer.type = tl_ecal_udp; udp_tlayer.version = 1; - udp_tlayer.confirmed = m_confirmed_layers.udp; + udp_tlayer.enabled = m_layers.udp.write_enabled; + udp_tlayer.active = m_layers.udp.active; udp_tlayer.par_layer.layer_par_udpmc = m_writer_udp->GetConnectionParameter().layer_par_udpmc; ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); } @@ -587,7 +658,8 @@ namespace eCAL eCAL::Registration::TLayer shm_tlayer; shm_tlayer.type = tl_ecal_shm; shm_tlayer.version = 1; - shm_tlayer.confirmed = m_confirmed_layers.shm; + shm_tlayer.enabled = m_layers.shm.write_enabled; + shm_tlayer.active = m_layers.shm.active; shm_tlayer.par_layer.layer_par_shm = m_writer_shm->GetConnectionParameter().layer_par_shm; ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); } @@ -600,7 +672,8 @@ namespace eCAL eCAL::Registration::TLayer tcp_tlayer; tcp_tlayer.type = tl_ecal_tcp; tcp_tlayer.version = 1; - tcp_tlayer.confirmed = m_confirmed_layers.tcp; + tcp_tlayer.enabled = m_layers.tcp.write_enabled; + tcp_tlayer.active = m_layers.tcp.active; tcp_tlayer.par_layer.layer_par_tcp = m_writer_tcp->GetConnectionParameter().layer_par_tcp; ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); } @@ -637,8 +710,11 @@ namespace eCAL Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Register"); #endif -#endif // ECAL_CORE_REGISTRATION return(true); +#else // ECAL_CORE_REGISTRATION +(void)force_; +return(false); +#endif // ECAL_CORE_REGISTRATION } bool CDataWriter::Unregister() @@ -667,11 +743,13 @@ namespace eCAL Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::UnRegister"); #endif -#endif // ECAL_CORE_REGISTRATION return(true); +#else // ECAL_CORE_REGISTRATION + return(false); +#endif // ECAL_CORE_REGISTRATION } - void CDataWriter::Connect(const std::string& tid_, const SDataTypeInformation& tinfo_) + void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) { SPubEventCallbackData data; data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); @@ -709,7 +787,7 @@ namespace eCAL } } - void CDataWriter::Disconnect() + void CDataWriter::FireDisconnectEvent() { if (m_connected) { @@ -731,91 +809,102 @@ namespace eCAL } } - void CDataWriter::StartTransportLayer() - { -#if ECAL_CORE_TRANSPORT_UDP - if (m_config.udp.enable) - { - ActivateUdpLayer(); - } -#endif -#if ECAL_CORE_TRANSPORT_SHM - if (m_config.shm.enable) - { - ActivateShmLayer(); - } -#endif -#if ECAL_CORE_TRANSPORT_TCP - if (m_config.tcp.enable) - { - ActivateTcpLayer(); - } -#endif - } - - void CDataWriter::StopTransportLayer() + bool CDataWriter::StartUdpLayer() { - // destroy udp writer #if ECAL_CORE_TRANSPORT_UDP - m_writer_udp.reset(); -#endif + if (m_layers.udp.write_enabled) return false; - // destroy shm writer -#if ECAL_CORE_TRANSPORT_SHM - m_writer_shm.reset(); -#endif - - // destroy tcp writer -#if ECAL_CORE_TRANSPORT_TCP - m_writer_tcp.reset(); -#endif - } + // flag enabled + m_layers.udp.write_enabled = true; - void CDataWriter::ActivateUdpLayer() - { -#if ECAL_CORE_TRANSPORT_UDP // log state - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::ACTIVATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateUdpLayer::ACTIVATED"); // create writer m_writer_udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.udp); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED"); #endif + return true; +#else // ECAL_CORE_TRANSPORT_UDP + return false; #endif // ECAL_CORE_TRANSPORT_UDP } - void CDataWriter::ActivateShmLayer() + bool CDataWriter::StartShmLayer() { #if ECAL_CORE_TRANSPORT_SHM + if (m_layers.shm.write_enabled) return false; + + // flag enabled + m_layers.shm.write_enabled = true; + // log state - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::ACTIVATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateShmLayer::ACTIVATED"); // create writer m_writer_shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.shm); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED"); #endif + return true; +#else // ECAL_CORE_TRANSPORT_SHM + return false; #endif // ECAL_CORE_TRANSPORT_SHM } - void CDataWriter::ActivateTcpLayer() + bool CDataWriter::StartTcpLayer() { #if ECAL_CORE_TRANSPORT_TCP + if (m_layers.tcp.write_enabled) return false; + + // flag enabled + m_layers.tcp.write_enabled = true; + // log state - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::ACTIVATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateTcpLayer::ACTIVATED"); // create writer m_writer_tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.tcp); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED"); #endif + return true; +#else // ECAL_CORE_TRANSPORT_TCP + return false; #endif // ECAL_CORE_TRANSPORT_TCP } + void CDataWriter::StopAllLayer() + { +#if ECAL_CORE_TRANSPORT_UDP + // flag disabled + m_layers.udp.write_enabled = false; + + // destroy writer + m_writer_udp.reset(); +#endif + +#if ECAL_CORE_TRANSPORT_SHM + // flag disabled + m_layers.shm.write_enabled = false; + + // destroy writer + m_writer_shm.reset(); +#endif + +#if ECAL_CORE_TRANSPORT_TCP + // flag disabled + m_layers.tcp.write_enabled = false; + + // destroy writer + m_writer_tcp.reset(); +#endif + } + size_t CDataWriter::PrepareWrite(long long id_, size_t len_) { // store id @@ -851,6 +940,26 @@ namespace eCAL return is_internal_only; } + TLayer::eTransportLayer CDataWriter::DetermineTransportLayer2Start(const std::vector& enabled_pub_layer_, const std::vector& enabled_sub_layer_, bool same_host_) + { + // determine the priority list to use + const Publisher::Configuration::LayerPriorityVector& layer_priority_vector = same_host_ ? m_config.layer_priority_local : m_config.layer_priority_remote; + + // find the highest priority transport layer that is available in both publisher and subscriber options + // TODO: we need to fusion the two layer enum types (eTransportLayer) in ecal_tlayer.h and ecal_struct_sample_common.hf + for (const TLayer::eTransportLayer layer : layer_priority_vector) + { + if (std::find(enabled_pub_layer_.begin(), enabled_pub_layer_.end(), layer) != enabled_pub_layer_.end() + && std::find(enabled_sub_layer_.begin(), enabled_sub_layer_.end(), layer) != enabled_sub_layer_.end()) + { + return layer; + } + } + + // return tl_none if no common transport layer is found + return TLayer::eTransportLayer::tlayer_none; + } + int32_t CDataWriter::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index d7738f8254..e473490633 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -25,12 +25,12 @@ #include #include -#include #include #include +#include #include "util/ecal_expmap.h" -#include +#include "util/frequency_calculator.h" #if ECAL_CORE_TRANSPORT_UDP #include "udp/ecal_writer_udp.h" @@ -59,11 +59,18 @@ namespace eCAL class CDataWriter { public: + struct SLayerState + { + bool read_enabled = false; // is subscriber enabled to read data on this layer? + bool write_enabled = false; // is this publisher configured to write data from this layer? + bool active = false; // data has been sent on this layer + }; + struct SLayerStates { - bool udp = false; - bool shm = false; - bool tcp = false; + SLayerState udp; + SLayerState shm; + SLayerState tcp; }; struct SSubscriptionInfo @@ -84,24 +91,22 @@ namespace eCAL bool Stop(); - bool SetDataTypeInformation(const SDataTypeInformation& topic_info_); + size_t Write(CPayloadWriter& payload_, long long time_, long long id_); - bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); - bool ClearAttribute(const std::string& attr_name_); + bool SetDataTypeInformation(const SDataTypeInformation& topic_info_); bool AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_); bool RemEventCallback(eCAL_Publisher_Event type_); - size_t Write(CPayloadWriter& payload_, long long time_, long long id_); + bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); + bool ClearAttribute(const std::string& attr_name_); - void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_, const std::string& reader_par_); + void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_); void RemoveSubscription(const SSubscriptionInfo& subscription_info_); void RefreshRegistration(); void RefreshSendCounter(); - std::string Dump(const std::string& indent_ = ""); - bool IsCreated() const { return(m_created); } bool IsSubscribed() const @@ -116,26 +121,29 @@ namespace eCAL return(m_sub_map.size()); } - const std::string& GetTopicName() const { return(m_topic_name); } + const std::string& GetTopicName() const { return(m_topic_name); } const SDataTypeInformation& GetDataTypeInformation() const { return m_topic_info; } + std::string Dump(const std::string& indent_ = ""); + protected: bool Register(bool force_); bool Unregister(); - void Connect(const std::string& tid_, const SDataTypeInformation& tinfo_); - void Disconnect(); + bool StartUdpLayer(); + bool StartShmLayer(); + bool StartTcpLayer(); - void StartTransportLayer(); - void StopTransportLayer(); + void StopAllLayer(); - void ActivateUdpLayer(); - void ActivateShmLayer(); - void ActivateTcpLayer(); + void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); + void FireDisconnectEvent(); size_t PrepareWrite(long long id_, size_t len_); - bool IsInternalSubscribedOnly(); + bool IsInternalSubscribedOnly(); + TLayer::eTransportLayer DetermineTransportLayer2Start(const std::vector& enabled_pub_layer_, const std::vector& enabled_sub_layer_, bool same_host_); + int32_t GetFrequency(); std::string m_host_name; @@ -177,7 +185,7 @@ namespace eCAL std::unique_ptr m_writer_tcp; #endif - SLayerStates m_confirmed_layers; + SLayerStates m_layers; std::atomic m_created; }; } diff --git a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp index 244460f1eb..a20c0db7c5 100644 --- a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,18 @@ * @brief shared memory layer **/ -#include -#include -#include +#include +#include #include "ecal_global_accessors.h" -#include "pubsub/ecal_subgate.h" -#include "io/shm/ecal_memfile_pool.h" #include "ecal_reader_shm.h" +#include "io/shm/ecal_memfile_pool.h" +#include "pubsub/ecal_subgate.h" + +#include +#include + namespace eCAL { //////////////// diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp index 43d1b56fe7..9194569bae 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,13 @@ * @brief memory file data writer **/ -#include #include -#include #include "ecal_def.h" #include "ecal_writer_shm.h" +#include + namespace eCAL { const std::string CDataWriterSHM::m_memfile_base_name = "ecal_"; @@ -39,6 +39,7 @@ namespace eCAL m_topic_name = topic_name_; // initialize memory file buffer + if (m_config.memfile_buffer_count < 1) m_config.memfile_buffer_count = 1; SetBufferCount(m_config.memfile_buffer_count); } diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.h b/ecal/core/src/readwrite/shm/ecal_writer_shm.h index fd6dff0d93..0898cdacea 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.h +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.h @@ -25,8 +25,8 @@ #include -#include "readwrite/ecal_writer_base.h" #include "io/shm/ecal_memfile_sync.h" +#include "readwrite/ecal_writer_base.h" #include #include diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp index ffbfbae0af..ed82df0fd1 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,14 @@ * @brief tcp reader and layer **/ -#include "ecal_global_accessors.h" - #include -#include "pubsub/ecal_subgate.h" - +#include "ecal_global_accessors.h" #include "ecal_reader_tcp.h" #include "ecal_tcp_pubsub_logger.h" +#include "pubsub/ecal_subgate.h" + #include "ecal_utils/portable_endian.h" namespace eCAL diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h index cc1a74c548..83fd64f557 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,9 @@ #include "serialization/ecal_struct_sample_payload.h" +#include +#include + namespace eCAL { //////////////// diff --git a/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp b/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp index 1bdcdf443e..1023bd8e39 100644 --- a/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp +++ b/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +21,14 @@ * @brief udp multicast reader and layer **/ -#include "ecal_reader_udp.h" +#include +#include "ecal_reader_udp.h" #include "ecal_global_accessors.h" -#include "pubsub/ecal_subgate.h" #include "io/udp/ecal_udp_configurations.h" +#include "pubsub/ecal_subgate.h" + #include #include #include diff --git a/ecal/core/src/serialization/ecal_serialize_common.cpp b/ecal/core/src/serialization/ecal_serialize_common.cpp index 7f8c49092d..6bb0ff6214 100644 --- a/ecal/core/src/serialization/ecal_serialize_common.cpp +++ b/ecal/core/src/serialization/ecal_serialize_common.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -261,9 +261,10 @@ namespace eCAL } eCAL_pb_TLayer pb_layer = eCAL_pb_TLayer_init_default; - pb_layer.type = static_cast(layer.type); - pb_layer.version = layer.version; - pb_layer.confirmed = layer.confirmed; + pb_layer.type = static_cast(layer.type); + pb_layer.version = layer.version; + pb_layer.enabled = layer.enabled; + pb_layer.active = layer.active; // layer pb_layer.has_par_layer = true; @@ -313,9 +314,10 @@ namespace eCAL } // apply layer values - layer.type = static_cast(pb_layer.type); - layer.version = pb_layer.version; - layer.confirmed = pb_layer.confirmed; + layer.type = static_cast(pb_layer.type); + layer.version = pb_layer.version; + layer.enabled = pb_layer.enabled; + layer.active = pb_layer.active; // apply tcp layer parameter layer.par_layer.layer_par_tcp.port = pb_layer.par_layer.layer_par_tcp.port; diff --git a/ecal/core/src/serialization/ecal_serialize_monitoring.cpp b/ecal/core/src/serialization/ecal_serialize_monitoring.cpp index 9776d4c536..bdfacd137e 100644 --- a/ecal/core/src/serialization/ecal_serialize_monitoring.cpp +++ b/ecal/core/src/serialization/ecal_serialize_monitoring.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,7 +126,7 @@ namespace eCAL_pb_TLayer pb_layer = eCAL_pb_TLayer_init_default; pb_layer.type = static_cast(layer.type); pb_layer.version = layer.version; - pb_layer.confirmed = layer.confirmed; + pb_layer.active = layer.active; if (!pb_encode_submessage(stream, eCAL_pb_TLayer_fields, &pb_layer)) { @@ -572,7 +572,7 @@ namespace // apply layer values layer.type = static_cast(pb_layer.type); layer.version = pb_layer.version; - layer.confirmed = pb_layer.confirmed; + layer.active = pb_layer.active; // add layer auto* tgt_vector = static_cast*>(*arg); diff --git a/ecal/core/src/serialization/ecal_struct_sample_registration.h b/ecal/core/src/serialization/ecal_struct_sample_registration.h index 383b8712c1..79cfc0efc6 100644 --- a/ecal/core/src/serialization/ecal_struct_sample_registration.h +++ b/ecal/core/src/serialization/ecal_struct_sample_registration.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -116,7 +116,8 @@ namespace eCAL { eTLayerType type = tl_none; // transport layer type int32_t version = 0; // transport layer version - bool confirmed = false; // transport layer used? + bool enabled = false; // transport layer enabled ? + bool active = false; // transport layer in use ? ConnectionPar par_layer; // transport layer parameter }; diff --git a/ecal/core/src/serialization/nanopb/ecal.pb.h b/ecal/core/src/serialization/nanopb/ecal.pb.h index 20c84373f2..8711ae4c83 100644 --- a/ecal/core/src/serialization/nanopb/ecal.pb.h +++ b/ecal/core/src/serialization/nanopb/ecal.pb.h @@ -14,7 +14,8 @@ #endif /* Enum definitions */ -typedef enum _eCAL_pb_eCmdType { +typedef enum _eCAL_pb_eCmdType { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 7 to 11; */ eCAL_pb_eCmdType_bct_none = 0, /* undefined command */ eCAL_pb_eCmdType_bct_set_sample = 1, /* set sample content */ eCAL_pb_eCmdType_bct_reg_publisher = 2, /* register publisher */ @@ -30,7 +31,8 @@ typedef enum _eCAL_pb_eCmdType { } eCAL_pb_eCmdType; /* Struct definitions */ -typedef struct _eCAL_pb_Content { +typedef struct _eCAL_pb_Content { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 5; */ int64_t id; /* sample id */ int64_t clock; /* internal used clock */ int64_t time; /* time the content was updated */ diff --git a/ecal/core/src/serialization/nanopb/layer.pb.h b/ecal/core/src/serialization/nanopb/layer.pb.h index 9abe83c506..64f39f35ff 100644 --- a/ecal/core/src/serialization/nanopb/layer.pb.h +++ b/ecal/core/src/serialization/nanopb/layer.pb.h @@ -10,7 +10,8 @@ #endif /* Enum definitions */ -typedef enum _eCAL_pb_eTLayerType { +typedef enum _eCAL_pb_eTLayerType { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 2, 3, 42; */ eCAL_pb_eTLayerType_tl_none = 0, /* undefined */ eCAL_pb_eTLayerType_tl_ecal_udp_mc = 1, /* ecal udp multicast */ /* 2 = ecal udp unicast (not supported anymore) @@ -34,7 +35,8 @@ typedef struct _eCAL_pb_LayerParTcp { int32_t port; /* tcp writers port number */ } eCAL_pb_LayerParTcp; -typedef struct _eCAL_pb_ConnnectionPar { +typedef struct _eCAL_pb_ConnnectionPar { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 3; */ bool has_layer_par_udpmc; eCAL_pb_LayerParUdpMC layer_par_udpmc; /* parameter for ecal udp multicast */ bool has_layer_par_shm; @@ -44,12 +46,14 @@ typedef struct _eCAL_pb_ConnnectionPar { eCAL_pb_LayerParTcp layer_par_tcp; /* parameter for ecal tcp */ } eCAL_pb_ConnnectionPar; -typedef struct _eCAL_pb_TLayer { +typedef struct _eCAL_pb_TLayer { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 4; */ eCAL_pb_eTLayerType type; /* transport layer type */ int32_t version; /* transport layer version */ - bool confirmed; /* transport layer used ? */ + bool active; /* transport layer in use ? */ bool has_par_layer; eCAL_pb_ConnnectionPar par_layer; /* transport layer parameter */ + bool enabled; /* transport layer enabled ? */ } eCAL_pb_TLayer; @@ -74,12 +78,12 @@ extern "C" { #define eCAL_pb_LayerParShm_init_default {{{NULL}, NULL}} #define eCAL_pb_LayerParTcp_init_default {0} #define eCAL_pb_ConnnectionPar_init_default {false, eCAL_pb_LayerParUdpMC_init_default, false, eCAL_pb_LayerParShm_init_default, false, eCAL_pb_LayerParTcp_init_default} -#define eCAL_pb_TLayer_init_default {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_default} +#define eCAL_pb_TLayer_init_default {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_default, 0} #define eCAL_pb_LayerParUdpMC_init_zero {0} #define eCAL_pb_LayerParShm_init_zero {{{NULL}, NULL}} #define eCAL_pb_LayerParTcp_init_zero {0} #define eCAL_pb_ConnnectionPar_init_zero {false, eCAL_pb_LayerParUdpMC_init_zero, false, eCAL_pb_LayerParShm_init_zero, false, eCAL_pb_LayerParTcp_init_zero} -#define eCAL_pb_TLayer_init_zero {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_zero} +#define eCAL_pb_TLayer_init_zero {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_zero, 0} /* Field tags (for use in manual encoding/decoding) */ #define eCAL_pb_LayerParShm_memory_file_list_tag 1 @@ -89,8 +93,9 @@ extern "C" { #define eCAL_pb_ConnnectionPar_layer_par_tcp_tag 4 #define eCAL_pb_TLayer_type_tag 1 #define eCAL_pb_TLayer_version_tag 2 -#define eCAL_pb_TLayer_confirmed_tag 3 +#define eCAL_pb_TLayer_active_tag 3 #define eCAL_pb_TLayer_par_layer_tag 5 +#define eCAL_pb_TLayer_enabled_tag 6 /* Struct field encoding specification for nanopb */ #define eCAL_pb_LayerParUdpMC_FIELDLIST(X, a) \ @@ -121,8 +126,9 @@ X(a, STATIC, OPTIONAL, MESSAGE, layer_par_tcp, 4) #define eCAL_pb_TLayer_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, UENUM, type, 1) \ X(a, STATIC, SINGULAR, INT32, version, 2) \ -X(a, STATIC, SINGULAR, BOOL, confirmed, 3) \ -X(a, STATIC, OPTIONAL, MESSAGE, par_layer, 5) +X(a, STATIC, SINGULAR, BOOL, active, 3) \ +X(a, STATIC, OPTIONAL, MESSAGE, par_layer, 5) \ +X(a, STATIC, SINGULAR, BOOL, enabled, 6) #define eCAL_pb_TLayer_CALLBACK NULL #define eCAL_pb_TLayer_DEFAULT NULL #define eCAL_pb_TLayer_par_layer_MSGTYPE eCAL_pb_ConnnectionPar diff --git a/ecal/core/src/serialization/nanopb/process.pb.h b/ecal/core/src/serialization/nanopb/process.pb.h index e3d3d4ae64..592f0b5f17 100644 --- a/ecal/core/src/serialization/nanopb/process.pb.h +++ b/ecal/core/src/serialization/nanopb/process.pb.h @@ -40,7 +40,8 @@ typedef struct _eCAL_pb_ProcessState { eCAL_pb_eProcessSeverityLevel severity_level; /* severity level */ } eCAL_pb_ProcessState; -typedef struct _eCAL_pb_Process { +typedef struct _eCAL_pb_Process { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 7 to 11; */ int32_t rclock; /* registration clock */ pb_callback_t hname; /* host name */ int32_t pid; /* process id */ diff --git a/ecal/core/src/serialization/nanopb/service.pb.h b/ecal/core/src/serialization/nanopb/service.pb.h index 753b7292f4..611f224e07 100644 --- a/ecal/core/src/serialization/nanopb/service.pb.h +++ b/ecal/core/src/serialization/nanopb/service.pb.h @@ -101,7 +101,7 @@ extern "C" { #define eCAL_pb_Response_init_default {false, eCAL_pb_ServiceHeader_init_default, {{NULL}, NULL}, 0} #define eCAL_pb_Method_init_default {{{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}} #define eCAL_pb_Service_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, 0, 0} -#define eCAL_pb_Client_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, 0} +#define eCAL_pb_Client_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}} #define eCAL_pb_ServiceHeader_init_zero {{{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, _eCAL_pb_ServiceHeader_eCallState_MIN, {{NULL}, NULL}} #define eCAL_pb_Request_init_zero {false, eCAL_pb_ServiceHeader_init_zero, {{NULL}, NULL}} #define eCAL_pb_Response_init_zero {false, eCAL_pb_ServiceHeader_init_zero, {{NULL}, NULL}, 0} diff --git a/ecal/core/src/serialization/nanopb/topic.pb.h b/ecal/core/src/serialization/nanopb/topic.pb.h index 9bcc66ed79..c7eb1ffc22 100644 --- a/ecal/core/src/serialization/nanopb/topic.pb.h +++ b/ecal/core/src/serialization/nanopb/topic.pb.h @@ -17,7 +17,8 @@ typedef struct _eCAL_pb_DataTypeInformation { pb_callback_t desc; /* descriptor information of the datatype (necessary for reflection) */ } eCAL_pb_DataTypeInformation; -typedef struct _eCAL_pb_Topic { +typedef struct _eCAL_pb_Topic { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 9, 10, 11, 14, 15, 22 to 26, 29; */ int32_t rclock; /* registration clock (heart beat) */ pb_callback_t hname; /* host name */ int32_t pid; /* process id */ diff --git a/ecal/core_pb/src/ecal/core/pb/ecal.proto b/ecal/core_pb/src/ecal/core/pb/ecal.proto index 2b4605b192..b5c0044940 100644 --- a/ecal/core_pb/src/ecal/core/pb/ecal.proto +++ b/ecal/core_pb/src/ecal/core/pb/ecal.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,8 @@ package eCAL.pb; message Content // topic content { - reserved 5; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 5; int64 id = 1; // sample id int64 clock = 2; // internal used clock diff --git a/ecal/core_pb/src/ecal/core/pb/layer.proto b/ecal/core_pb/src/ecal/core/pb/layer.proto index dc45cd1b1a..db9f6d7b25 100644 --- a/ecal/core_pb/src/ecal/core/pb/layer.proto +++ b/ecal/core_pb/src/ecal/core/pb/layer.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,8 @@ message LayerParTcp message ConnnectionPar // connection parameter for reader / writer { - reserved 3; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 3; LayerParUdpMC layer_par_udpmc = 1; // parameter for ecal udp multicast LayerParShm layer_par_shm = 2; // parameter for ecal shared memory @@ -62,10 +63,12 @@ enum eTLayerType // transport layer message TLayer { - reserved 4; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 4; eTLayerType type = 1; // transport layer type int32 version = 2; // transport layer version - bool confirmed = 3; // transport layer used ? + bool enabled = 6; // transport layer enabled ? + bool active = 3; // transport layer in use ? ConnnectionPar par_layer = 5; // transport layer parameter } diff --git a/ecal/core_pb/src/ecal/core/pb/process.proto b/ecal/core_pb/src/ecal/core/pb/process.proto index 014b51229f..e59d3dae77 100644 --- a/ecal/core_pb/src/ecal/core/pb/process.proto +++ b/ecal/core_pb/src/ecal/core/pb/process.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,8 @@ enum eTSyncState // time synchronisatio message Process // process { - reserved 7 to 11; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 7 to 11; int32 rclock = 1; // registration clock string hname = 2; // host name diff --git a/ecal/core_pb/src/ecal/core/pb/topic.proto b/ecal/core_pb/src/ecal/core/pb/topic.proto index 84d657bf15..0da480cf68 100644 --- a/ecal/core_pb/src/ecal/core/pb/topic.proto +++ b/ecal/core_pb/src/ecal/core/pb/topic.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,8 @@ message DataTypeInformation message Topic // eCAL topic { - reserved 9, 10, 11, 14, 15, 22 to 26, 29; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 9, 10, 11, 14, 15, 22 to 26, 29; int32 rclock = 1; // registration clock (heart beat) string hname = 2; // host name diff --git a/ecal/samples/cpp/monitoring/monitoring_rec/src/monitoring_rec.cpp b/ecal/samples/cpp/monitoring/monitoring_rec/src/monitoring_rec.cpp index 30e605ea2b..b8bde04b0f 100644 --- a/ecal/samples/cpp/monitoring/monitoring_rec/src/monitoring_rec.cpp +++ b/ecal/samples/cpp/monitoring/monitoring_rec/src/monitoring_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -195,18 +195,18 @@ int main(int argc, char** argv) default: break; } - std::cout << " tlayer.type : " << layer_type << std::endl; // transport layers type - std::cout << " tlayer.confirmed: " << layer.confirmed() << std::endl; // transport layers confirmation + std::cout << " tlayer.type : " << layer_type << std::endl; // transport layers type + std::cout << " tlayer.active : " << layer.active() << std::endl; // transport layers confirmation } - std::cout << "tsize : " << topic.tsize() << std::endl; // topic size + std::cout << "tsize : " << topic.tsize() << std::endl; // topic size std::cout << "connections_loc : " << topic.connections_loc() << std::endl; // number of local connected entities std::cout << "connections_ext : " << topic.connections_ext() << std::endl; // number of external connected entities - std::cout << "message_drops : " << topic.message_drops() << std::endl; // dropped messages + std::cout << "message_drops : " << topic.message_drops() << std::endl; // dropped messages - std::cout << "did : " << topic.did() << std::endl; // data send id (publisher setid) - std::cout << "dclock : " << topic.dclock() << std::endl; // data clock (send / receive action) - std::cout << "dfreq : " << topic.dfreq() << std::endl; // data frequency (send / receive samples per second * 1000) + std::cout << "did : " << topic.did() << std::endl; // data send id (publisher setid) + std::cout << "dclock : " << topic.dclock() << std::endl; // data clock (send / receive action) + std::cout << "dfreq : " << topic.dfreq() << std::endl; // data frequency (send / receive samples per second * 1000) std::cout << std::endl; } diff --git a/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp b/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp index 9bb405b705..1d42320541 100644 --- a/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp +++ b/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,8 +53,16 @@ int main(int argc, char **argv) // set process state eCAL::Process::SetState(proc_sev_healthy, proc_sev_level1, "I feel good !"); + // create a subscriber config + eCAL::Subscriber::Configuration sub_config; + + // activate transport layer + sub_config.shm.enable = true; + sub_config.udp.enable = true; + sub_config.tcp.enable = true; + // create a subscriber (topic name "person") - eCAL::protobuf::CSubscriber sub("person"); + eCAL::protobuf::CSubscriber sub("person", sub_config); // add receive callback function (_1 = topic_name, _2 = msg, _3 = time, _4 = clock, _5 = id) auto callback = std::bind(OnPerson, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); diff --git a/ecal/tests/CMakeLists.txt b/ecal/tests/CMakeLists.txt index 5e2fc66b9f..d598e1da60 100644 --- a/ecal/tests/CMakeLists.txt +++ b/ecal/tests/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ecal/tests/cpp/serialization_test/src/registration_compare.cpp b/ecal/tests/cpp/serialization_test/src/registration_compare.cpp index f4eb4f8ecc..2054b0debc 100644 --- a/ecal/tests/cpp/serialization_test/src/registration_compare.cpp +++ b/ecal/tests/cpp/serialization_test/src/registration_compare.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -148,7 +148,8 @@ namespace eCAL // compare TLayer objects for equality return (layer1.type == layer2.type) && (layer1.version == layer2.version) && - (layer1.confirmed == layer2.confirmed) && + (layer1.enabled == layer2.enabled) && + (layer1.active == layer2.active) && CompareConnectionPar(layer1.par_layer, layer2.par_layer); }); } diff --git a/ecal/tests/cpp/serialization_test/src/registration_generate.cpp b/ecal/tests/cpp/serialization_test/src/registration_generate.cpp index e29a95f6f2..b969ca02bf 100644 --- a/ecal/tests/cpp/serialization_test/src/registration_generate.cpp +++ b/ecal/tests/cpp/serialization_test/src/registration_generate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,7 +95,8 @@ namespace eCAL TLayer layer; layer.type = static_cast(rand() % (tl_all + 1)); layer.version = rand() % 100; - layer.confirmed = rand() % 2 == 1; + layer.enabled = rand() % 2 == 1; + layer.active = rand() % 2 == 1; return layer; }