Skip to content

Commit

Permalink
[core] new-pub-sub-matching (#1653)
Browse files Browse the repository at this point in the history
Co-authored-by: Rex Schilasky <[email protected]>
  • Loading branch information
KerstinKeller and rex-schilasky committed Jul 17, 2024
1 parent 73f5f3c commit 4659192
Show file tree
Hide file tree
Showing 41 changed files with 859 additions and 612 deletions.
4 changes: 2 additions & 2 deletions app/mon/mon_gui/src/widgets/models/topic_tree_item.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -209,7 +209,7 @@ QVariant TopicTreeItem::data(Columns column, Qt::ItemDataRole role) const
for (const auto& layer : layer_pb)
{
QString this_layer_string;
if (layer.confirmed())
if (layer.active())
{
switch (layer.type())
{
Expand Down
4 changes: 2 additions & 2 deletions app/mon/mon_tui/src/model/monitor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -204,7 +204,7 @@ class MonitorModel
topic.type_descriptor = std::move(*t.mutable_tdatatype()->mutable_desc());
for(auto &tl: t.tlayer())
{
if (tl.confirmed())
if (tl.active())
{
topic.transport_layers.emplace_back(TopicTransportLayer(tl.type()));
}
Expand Down
16 changes: 11 additions & 5 deletions ecal/core/include/ecal/config/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@
#pragma once

#include <ecal/ecal_os.h>
#include <ecal/ecal_tlayer.h>
#include <ecal/types/ecal_custom_data_types.h>

#include <cstddef>
#include <vector>

namespace eCAL
{
Expand Down Expand Up @@ -136,12 +138,16 @@ namespace eCAL
{
ECAL_API Configuration();

SHM::Configuration shm;
UDP::Configuration udp;
TCP::Configuration tcp;
SHM::Configuration shm;
UDP::Configuration udp;
TCP::Configuration tcp;

bool share_topic_type; //!< share topic type via registration
bool share_topic_description; //!< share topic description via registration
using LayerPriorityVector = std::vector<TLayer::eTransportLayer>;
LayerPriorityVector layer_priority_local = { TLayer::tlayer_shm, TLayer::tlayer_udp_mc, TLayer::tlayer_tcp };
LayerPriorityVector layer_priority_remote = { TLayer::tlayer_udp_mc, TLayer::tlayer_tcp };

bool share_topic_type; //!< share topic type via registration
bool share_topic_description; //!< share topic description via registration
};
}
}
4 changes: 2 additions & 2 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@

#pragma once

#include <cstddef>
#include <ecal/ecal_callback.h>
#include <ecal/ecal_deprecate.h>
#include <ecal/ecal_os.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/config/publisher.h>
#include <ecal/ecal_config.h>
#include <ecal/ecal_types.h>
#include <ecal/config/publisher.h>

#include <chrono>
#include <cstddef>
#include <memory>
#include <string>

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ namespace eCAL
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param data_type_info_ Topic data type information (encoding, type, descriptor).
* @param config_ Optional configuration parameters.
**/
ECAL_API explicit CSubscriber(const std::string& topic_name_, const Subscriber::Configuration& config_ = {});

Expand Down
8 changes: 3 additions & 5 deletions ecal/core/include/ecal/msg/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,13 @@ namespace eCAL
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
**/
CMessageSubscriber(const std::string& topic_name_) : CSubscriber()
CMessageSubscriber(const std::string& topic_name_, const Subscriber::Configuration& config_ = {}) : CSubscriber()
, m_deserializer()
{
SDataTypeInformation topic_info = m_deserializer.GetDataTypeInformation();
CSubscriber::Create(topic_name_, topic_info);
CSubscriber::Create(topic_name_, topic_info, config_);
}

~CMessageSubscriber() noexcept
Expand Down Expand Up @@ -418,7 +419,4 @@ namespace eCAL
MsgReceiveCallbackT m_cb_callback;
Deserializer m_deserializer;
};



}
8 changes: 4 additions & 4 deletions ecal/core/include/ecal/types/monitoring.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,9 +64,9 @@ namespace eCAL

struct TLayer
{
eTLayerType type = tl_none; //<! transport layer type
int32_t version = 0; //<! transport layer version
bool confirmed = false; //<! transport layer used?
eTLayerType type = tl_none; //<! transport layer type
int32_t version = 0; //<! transport layer version
bool active = false; //<! transport layer used?
};

struct STopicMon //<! eCAL Topic struct
Expand Down
4 changes: 3 additions & 1 deletion ecal/core/src/ecal_global_accessors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
* @brief eCAL core functions
**/

#include "ecal/config/configuration.h"

#include "ecal_global_accessors.h"
#include "ecal_def.h"
#include "ecal_globals.h"
#include "ecal/config/configuration.h"

#include <atomic>
#include <string>

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/ecal_process.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
18 changes: 9 additions & 9 deletions ecal/core/src/monitoring/ecal_monitoring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ namespace eCAL
bool topic_tlayer_ecal_tcp(false);
for (const auto& layer : sample_topic.tlayer)
{
topic_tlayer_ecal_udp |= (layer.type == tl_ecal_udp) && layer.confirmed;
topic_tlayer_ecal_shm |= (layer.type == tl_ecal_shm) && layer.confirmed;
topic_tlayer_ecal_tcp |= (layer.type == tl_ecal_tcp) && layer.confirmed;
topic_tlayer_ecal_udp |= (layer.type == tl_ecal_udp) && layer.active;
topic_tlayer_ecal_shm |= (layer.type == tl_ecal_shm) && layer.active;
topic_tlayer_ecal_tcp |= (layer.type == tl_ecal_tcp) && layer.active;
}
const int32_t connections_loc = sample_topic.connections_loc;
const int32_t connections_ext = sample_topic.connections_ext;
Expand Down Expand Up @@ -305,22 +305,22 @@ namespace eCAL
// tlayer udp_mc
{
eCAL::Monitoring::TLayer tlayer;
tlayer.type = eCAL::Monitoring::tl_ecal_udp_mc;
tlayer.confirmed = topic_tlayer_ecal_udp;
tlayer.type = eCAL::Monitoring::tl_ecal_udp_mc;
tlayer.active = topic_tlayer_ecal_udp;
TopicInfo.tlayer.push_back(tlayer);
}
// tlayer shm
{
eCAL::Monitoring::TLayer tlayer;
tlayer.type = eCAL::Monitoring::tl_ecal_shm;
tlayer.confirmed = topic_tlayer_ecal_shm;
tlayer.type = eCAL::Monitoring::tl_ecal_shm;
tlayer.active = topic_tlayer_ecal_shm;
TopicInfo.tlayer.push_back(tlayer);
}
// tlayer tcp
{
eCAL::Monitoring::TLayer tlayer;
tlayer.type = eCAL::Monitoring::tl_ecal_tcp;
tlayer.confirmed = topic_tlayer_ecal_tcp;
tlayer.type = eCAL::Monitoring::tl_ecal_tcp;
tlayer.active = topic_tlayer_ecal_tcp;
TopicInfo.tlayer.push_back(tlayer);
}

Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ namespace eCAL
CDataWriter::SLayerStates layer_states;
for (const auto& layer : ecal_topic.tlayer)
{
if (layer.confirmed)
if (layer.enabled)
{
switch (layer.type)
{
case TLayer::tlayer_udp_mc:
layer_states.udp = true;
layer_states.udp.read_enabled = true;
break;
case TLayer::tlayer_shm:
layer_states.shm = true;
layer_states.shm.read_enabled = true;
break;
case TLayer::tlayer_tcp:
layer_states.tcp = true;
layer_states.tcp.read_enabled = true;
break;
default:
break;
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_publisher.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
12 changes: 6 additions & 6 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ namespace eCAL
const auto& ecal_sample_content = ecal_sample.content;
for (const auto& reader : readers_to_apply)
{
applied_size = reader->AddSample(
applied_size = reader->ApplySample(
ecal_sample.topic.tid,
payload_addr,
payload_size,
Expand Down Expand Up @@ -207,7 +207,7 @@ namespace eCAL

for (const auto& reader : readers_to_apply)
{
applied_size = reader->AddSample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_);
applied_size = reader->ApplySample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_);
}

return (applied_size > 0);
Expand All @@ -232,18 +232,18 @@ namespace eCAL
CDataReader::SLayerStates layer_states;
for (const auto& layer : ecal_topic.tlayer)
{
if (layer.confirmed)
if (layer.enabled)
{
switch (layer.type)
{
case TLayer::tlayer_udp_mc:
layer_states.udp = true;
layer_states.udp.write_enabled = true;
break;
case TLayer::tlayer_shm:
layer_states.shm = true;
layer_states.shm.write_enabled = true;
break;
case TLayer::tlayer_tcp:
layer_states.tcp = true;
layer_states.tcp.write_enabled = true;
break;
default:
break;
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ namespace eCAL
if (topic_name_.empty()) return(false);

// create datareader
m_datareader = std::make_shared<CDataReader>(topic_name_, data_type_info_);
m_datareader = std::make_shared<CDataReader>(topic_name_, data_type_info_, config_);

// register datareader
g_subgate()->Register(topic_name_, m_datareader);
Expand Down Expand Up @@ -143,7 +143,7 @@ namespace eCAL
bool CSubscriber::ReceiveBuffer(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ /* = 0 */) const
{
if (!m_created) return(false);
return(m_datareader->Receive(buf_, time_, rcv_timeout_));
return(m_datareader->Read(buf_, time_, rcv_timeout_));
}

bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_)
Expand Down
Loading

0 comments on commit 4659192

Please sign in to comment.