Skip to content

Commit

Permalink
[core] refactor registration receiver and file structure (eclipse-eca…
Browse files Browse the repository at this point in the history
  • Loading branch information
KerstinKeller committed Jul 17, 2024
1 parent 15d8354 commit 73f5f3c
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 101 deletions.
14 changes: 8 additions & 6 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,17 @@ if (ECAL_CORE_REGISTRATION)
src/registration/ecal_registration_receiver.cpp
src/registration/ecal_registration_receiver.h
src/registration/ecal_registration_sender.h
src/registration/ecal_registration_sender_udp.cpp
src/registration/ecal_registration_sender_udp.h
src/registration/udp/ecal_registration_receiver_udp.cpp
src/registration/udp/ecal_registration_receiver_udp.h
src/registration/udp/ecal_registration_sender_udp.cpp
src/registration/udp/ecal_registration_sender_udp.h
)
if(ECAL_CORE_REGISTRATION_SHM)
list(APPEND ecal_registration_src
src/registration/ecal_registration_receiver_shm.cpp
src/registration/ecal_registration_receiver_shm.h
src/registration/ecal_registration_sender_shm.cpp
src/registration/ecal_registration_sender_shm.h
src/registration/shm/ecal_registration_receiver_shm.cpp
src/registration/shm/ecal_registration_receiver_shm.h
src/registration/shm/ecal_registration_sender_shm.cpp
src/registration/shm/ecal_registration_sender_shm.h
src/registration/shm/ecal_memfile_broadcast.cpp
src/registration/shm/ecal_memfile_broadcast.h
src/registration/shm/ecal_memfile_broadcast_reader.cpp
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
#include "ecal_def.h"

#include <registration/ecal_process_registration.h>
#include <registration/ecal_registration_sender_udp.h>
#include <registration/udp/ecal_registration_sender_udp.h>
#if ECAL_CORE_REGISTRATION_SHM
#include <registration/ecal_registration_sender_shm.h>
#include <registration/shm/ecal_registration_sender_shm.h>
#endif

namespace eCAL
Expand Down
43 changes: 10 additions & 33 deletions ecal/core/src/registration/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
*
**/

#include "ecal_registration_receiver.h"
#include "registration/ecal_registration_receiver.h"

#include "registration/udp/ecal_registration_receiver_udp.h"
#if ECAL_CORE_REGISTRATION_SHM
#include "registration/shm/ecal_registration_receiver_shm.h"
#endif
#include "ecal_global_accessors.h"

#include "pubsub/ecal_subgate.h"
Expand Down Expand Up @@ -75,26 +80,13 @@ namespace eCAL

if (m_use_registration_udp)
{
// set network attributes
eCAL::UDP::SReceiverAttr attr;
attr.address = UDP::GetRegistrationAddress();
attr.port = UDP::GetRegistrationPort();
attr.broadcast = UDP::IsBroadcast();
attr.loopback = true;
attr.rcvbuf = Config::GetUdpMulticastRcvBufSizeBytes();

// start registration sample receiver
m_registration_receiver = std::make_shared<UDP::CSampleReceiver>(attr, std::bind(&CRegistrationReceiver::HasSample, this, std::placeholders::_1), std::bind(&CRegistrationReceiver::ApplySerializedSample, this, std::placeholders::_1, std::placeholders::_2));
m_registration_receiver_udp = std::make_unique<CRegistrationReceiverUDP>([this](const Registration::Sample& sample_) {return this->ApplySample(sample_); });
}

#if ECAL_CORE_REGISTRATION_SHM
if (m_use_registration_shm)
{
m_memfile_broadcast.Create(Config::Experimental::GetShmMonitoringDomain(), Config::Experimental::GetShmMonitoringQueueSize());
m_memfile_broadcast.FlushLocalEventQueue();
m_memfile_broadcast_reader.Bind(&m_memfile_broadcast);

m_memfile_reg_rcv.Create(&m_memfile_broadcast_reader);
m_registration_receiver_shm = std::make_unique<CRegistrationReceiverSHM>([this](const Registration::Sample& sample_) {return this->ApplySample(sample_); });
}
#endif

Expand All @@ -105,21 +97,16 @@ namespace eCAL
{
if(!m_created) return;

// stop network registration receive thread
m_registration_receiver = nullptr;

// stop network registration receive thread
if (m_use_registration_udp)
{
m_registration_receiver = nullptr;
m_registration_receiver_udp = nullptr;
}

#if ECAL_CORE_REGISTRATION_SHM
if (m_use_registration_shm)
{
// stop memfile registration receive thread and unbind reader
m_memfile_broadcast_reader.Unbind();
m_memfile_broadcast.Destroy();
m_registration_receiver_shm = nullptr;
}
#endif

Expand All @@ -139,16 +126,6 @@ namespace eCAL
m_loopback = state_;
}

bool CRegistrationReceiver::ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_)
{
if(!m_created) return false;

Registration::Sample sample;
if (!DeserializeFromBuffer(serialized_sample_data_, serialized_sample_size_, sample)) return false;

return ApplySample(sample);
}

bool CRegistrationReceiver::ApplySample(const Registration::Sample& sample_)
{
if (!m_created) return false;
Expand Down
19 changes: 5 additions & 14 deletions ecal/core/src/registration/ecal_registration_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@
#include <cstddef>
#include <ecal/ecal.h>

#include "io/udp/ecal_udp_sample_receiver.h"
#include "serialization/ecal_struct_sample_registration.h"

#if ECAL_CORE_REGISTRATION_SHM
#include "ecal_registration_receiver_shm.h"
#endif

#include <atomic>
#include <functional>
#include <map>
Expand All @@ -47,6 +42,9 @@

namespace eCAL
{
class CRegistrationReceiverUDP;
class CRegistrationReceiverSHM;

class CRegistrationReceiver
{
public:
Expand All @@ -58,7 +56,6 @@ namespace eCAL

void EnableLoopback(bool state_);

bool HasSample(const std::string& /*sample_name_*/) { return(true); };
bool ApplySample(const Registration::Sample& sample_);

bool AddRegistrationCallback(enum eCAL_Registration_Event event_, const RegistrationCallbackT& callback_);
Expand All @@ -69,8 +66,6 @@ namespace eCAL
void RemCustomApplySampleCallback(const std::string& customer_);

protected:
bool ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_);

void ApplySubscriberRegistration(const eCAL::Registration::Sample& sample_);
void ApplyPublisherRegistration(const eCAL::Registration::Sample& sample_);

Expand All @@ -85,14 +80,10 @@ namespace eCAL
RegistrationCallbackT m_callback_service;
RegistrationCallbackT m_callback_client;
RegistrationCallbackT m_callback_process;

std::shared_ptr<UDP::CSampleReceiver> m_registration_receiver;

std::unique_ptr<CRegistrationReceiverUDP> m_registration_receiver_udp;
#if ECAL_CORE_REGISTRATION_SHM
CMemoryFileBroadcast m_memfile_broadcast;
CMemoryFileBroadcastReader m_memfile_broadcast_reader;

CMemfileRegistrationReceiver m_memfile_reg_rcv;
std::unique_ptr<CRegistrationReceiverSHM> m_registration_receiver_shm;
#endif

bool m_use_registration_udp;
Expand Down
33 changes: 33 additions & 0 deletions ecal/core/src/registration/ecal_registration_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

#pragma once

#include <functional>
#include <serialization/ecal_struct_sample_registration.h>

namespace eCAL {
/**
* @brief Apply sample callback type.
*
* @param sample_ The sample protocol buffer registration payload buffer.
* @param sample_size_ The payload buffer size.
**/
using RegistrationApplySampleCallbackT = std::function<bool(const Registration::Sample&)>;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -27,47 +27,48 @@

#include "ecal_globals.h"

#include "ecal_registration_receiver_shm.h"
#include "registration/shm/ecal_registration_receiver_shm.h"
#include "serialization/ecal_serialize_sample_registration.h"
#include <chrono>
#include <functional>
#include <memory>

#include "registration/shm/ecal_memfile_broadcast.h"
#include "registration/shm/ecal_memfile_broadcast_reader.h"
#include "util/ecal_thread.h"

namespace eCAL
{
//////////////////////////////////////////////////////////////////
// CMemfileRegistrationReceiver
//////////////////////////////////////////////////////////////////

CMemfileRegistrationReceiver::~CMemfileRegistrationReceiver()
{
Destroy();
}

void CMemfileRegistrationReceiver::Create(eCAL::CMemoryFileBroadcastReader* memfile_broadcast_reader_)
CRegistrationReceiverSHM::CRegistrationReceiverSHM(RegistrationApplySampleCallbackT apply_sample_callback)
: m_apply_sample_callback(apply_sample_callback)
{
if (m_created) return;
m_memfile_broadcast = std::make_unique<CMemoryFileBroadcast>();
m_memfile_broadcast->Create(Config::Experimental::GetShmMonitoringDomain(), Config::Experimental::GetShmMonitoringQueueSize());
m_memfile_broadcast->FlushLocalEventQueue();

// start memfile broadcast receive thread
m_memfile_broadcast_reader = memfile_broadcast_reader_;
m_memfile_broadcast_reader_thread = std::make_shared<CCallbackThread>(std::bind(&CMemfileRegistrationReceiver::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()/2));
m_memfile_broadcast_reader = std::make_unique<CMemoryFileBroadcastReader>();
// This is a bit unclean to take the raw adress of the reader here.
m_memfile_broadcast_reader->Bind(m_memfile_broadcast.get());

m_created = true;
m_memfile_broadcast_reader_thread = std::make_unique<CCallbackThread>(std::bind(&CRegistrationReceiverSHM::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs() / 2));
}

void CMemfileRegistrationReceiver::Destroy()
CRegistrationReceiverSHM::~CRegistrationReceiverSHM()
{
if (!m_created) return;

// stop memfile broadcast receive thread
m_memfile_broadcast_reader_thread->stop();
m_memfile_broadcast_reader_thread = nullptr;

// stop memfile registration receive thread and unbind reader
m_memfile_broadcast_reader->Unbind();
m_memfile_broadcast_reader = nullptr;

m_created = false;
m_memfile_broadcast->Destroy();
m_memfile_broadcast = nullptr;
}

void CMemfileRegistrationReceiver::Receive()
void CRegistrationReceiverSHM::Receive()
{
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
Expand All @@ -79,7 +80,7 @@ namespace eCAL
{
for (const auto& sample : sample_list.samples)
{
if (g_registration_receiver() != nullptr) g_registration_receiver()->ApplySample(sample);
m_apply_sample_callback(sample);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -27,38 +27,37 @@

#pragma once

#include "shm/ecal_memfile_broadcast.h"
#include "shm/ecal_memfile_broadcast_reader.h"

#include "util/ecal_thread.h"
#include <memory>
#include <registration/ecal_registration_types.h>

namespace eCAL
{
class CMemfileRegistrationReceiver
class CCallbackThread;
class CMemoryFileBroadcast;
class CMemoryFileBroadcastReader;

class CRegistrationReceiverSHM
{
public:
CMemfileRegistrationReceiver() = default;
~CMemfileRegistrationReceiver();
CRegistrationReceiverSHM(RegistrationApplySampleCallbackT apply_sample_callback);
~CRegistrationReceiverSHM();

// default copy constructor
CMemfileRegistrationReceiver(const CMemfileRegistrationReceiver& other) = delete;
CRegistrationReceiverSHM(const CRegistrationReceiverSHM& other) = delete;
// default copy assignment operator
CMemfileRegistrationReceiver& operator=(const CMemfileRegistrationReceiver& other) = delete;
CRegistrationReceiverSHM& operator=(const CRegistrationReceiverSHM& other) = delete;
// default move constructor
CMemfileRegistrationReceiver(CMemfileRegistrationReceiver&& other) noexcept = delete;
CRegistrationReceiverSHM(CRegistrationReceiverSHM&& other) noexcept = delete;
// default move assignment operator
CMemfileRegistrationReceiver& operator=(CMemfileRegistrationReceiver&& other) noexcept = delete;

void Create(CMemoryFileBroadcastReader* memfile_broadcast_reader_);
void Destroy();
CRegistrationReceiverSHM& operator=(CRegistrationReceiverSHM&& other) noexcept = delete;

private:
void Receive();

CMemoryFileBroadcastReader* m_memfile_broadcast_reader = nullptr;
std::shared_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;
std::unique_ptr<CMemoryFileBroadcast> m_memfile_broadcast;
std::unique_ptr<CMemoryFileBroadcastReader> m_memfile_broadcast_reader;
std::unique_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;

bool m_created = false;
RegistrationApplySampleCallbackT m_apply_sample_callback;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
**/

#include "registration/ecal_registration_sender_shm.h"
#include "registration/shm/ecal_registration_sender_shm.h"
#include "serialization/ecal_serialize_sample_registration.h"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

#include "registration/ecal_registration_sender.h"

#include "shm/ecal_memfile_broadcast.h"
#include "shm/ecal_memfile_broadcast_writer.h"
#include "registration/shm/ecal_memfile_broadcast.h"
#include "registration/shm/ecal_memfile_broadcast_writer.h"

namespace eCAL
{
Expand Down
Loading

0 comments on commit 73f5f3c

Please sign in to comment.