Skip to content

Commit

Permalink
Methods to retrieve matched counts on pub/sub (#234)
Browse files Browse the repository at this point in the history
* Methods to retrieve matched counts on pub/sub.

* Address reviewer feedback.

* Fix missing publisher.

* Fix some potential leaks.

* Replace with CHECK_ARGUMENT_FOR_NULL

* Apply suggestions from code review

Co-Authored-By: mjcarroll <[email protected]>

* Address reviewer feedback.
  • Loading branch information
mjcarroll authored Nov 28, 2018
1 parent dddbb04 commit 6c99127
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 14 deletions.
32 changes: 29 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,24 @@ rmw_create_publisher(
Domain::getDefaultPublisherAttributes(publisherParam);

// TODO(karsten1987) Verify consequences for std::unique_ptr?
info = new CustomPublisherInfo();
info = new (std::nothrow) CustomPublisherInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo");
return nullptr;
}

info->typesupport_identifier_ = type_support->typesupport_identifier;

auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
std::string type_name = _create_type_name(callbacks, "msg");
if (!Domain::getRegisteredType(participant, type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->type_support_)))
{
info->type_support_ = new MessageTypeSupport_cpp(callbacks);
info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("Failed to allocate MessageTypeSupport");
goto fail;
}
_register_type(participant, info->type_support_);
}

Expand Down Expand Up @@ -128,8 +137,13 @@ rmw_create_publisher(
goto fail;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, nullptr);
info->listener_ = new (std::nothrow) PubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener");
goto fail;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
Expand Down Expand Up @@ -171,6 +185,9 @@ rmw_create_publisher(
if (info->type_support_ != nullptr) {
delete info->type_support_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
delete info;
}

Expand All @@ -181,6 +198,15 @@ rmw_create_publisher(
return nullptr;
}

rmw_ret_t
rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_count_matched_subscriptions(
publisher, subscription_count);
}

rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
Expand Down
33 changes: 29 additions & 4 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,24 @@ rmw_create_subscription(
// Load default XML profile.
Domain::getDefaultSubscriberAttributes(subscriberParam);

info = new CustomSubscriberInfo();
info = new (std::nothrow) CustomSubscriberInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate CustomSubscriberInfo");
return nullptr;
}

info->typesupport_identifier_ = type_support->typesupport_identifier;

auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
std::string type_name = _create_type_name(callbacks, "msg");
if (!Domain::getRegisteredType(participant, type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->type_support_)))
{
info->type_support_ = new MessageTypeSupport_cpp(callbacks);
info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("failed to allocate MessageTypeSupport_cpp");
goto fail;
}
_register_type(participant, info->type_support_);
}

Expand All @@ -122,9 +131,13 @@ rmw_create_subscription(
goto fail;
}

info->listener_ = new SubListener(info);
info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener");
goto fail;
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
goto fail;
Expand Down Expand Up @@ -154,6 +167,9 @@ rmw_create_subscription(
if (info->type_support_ != nullptr) {
delete info->type_support_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
delete info;
}

Expand All @@ -164,6 +180,15 @@ rmw_create_subscription(
return nullptr;
}

rmw_ret_t
rmw_subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_count_matched_publishers(
subscription, publisher_count);
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
Expand Down
25 changes: 23 additions & 2 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ rmw_create_publisher(
Domain::getDefaultPublisherAttributes(publisherParam);

// TODO(karsten1987) Verify consequences for std::unique_ptr?
info = new CustomPublisherInfo();
info = new (std::nothrow) CustomPublisherInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo");
return nullptr;
}
info->typesupport_identifier_ = type_support->typesupport_identifier;

std::string type_name = _create_type_name(
Expand Down Expand Up @@ -128,8 +132,13 @@ rmw_create_publisher(
goto fail;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, nullptr);
info->listener_ = new (std::nothrow) PubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener");
goto fail;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
Expand Down Expand Up @@ -171,6 +180,9 @@ rmw_create_publisher(
if (info->type_support_ != nullptr) {
delete info->type_support_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
delete info;
}

Expand All @@ -181,6 +193,15 @@ rmw_create_publisher(
return nullptr;
}

rmw_ret_t
rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_count_matched_subscriptions(
publisher, subscription_count);
}

rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
Expand Down
3 changes: 2 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ rmw_serialize(
}
}

eprosima::fastcdr::FastBuffer buffer(reinterpret_cast<char *>(serialized_message->buffer),
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer),
data_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
Expand Down
26 changes: 23 additions & 3 deletions rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ rmw_create_subscription(
// Load default XML profile.
Domain::getDefaultSubscriberAttributes(subscriberParam);

info = new CustomSubscriberInfo();
info = new (std::nothrow) CustomSubscriberInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate CustomSubscriberInfo");
return nullptr;
}
info->typesupport_identifier_ = type_support->typesupport_identifier;

std::string type_name = _create_type_name(
Expand Down Expand Up @@ -123,9 +127,13 @@ rmw_create_subscription(
goto fail;
}

info->listener_ = new SubListener(info);
info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener");
goto fail;
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
goto fail;
Expand Down Expand Up @@ -155,6 +163,9 @@ rmw_create_subscription(
if (info->type_support_ != nullptr) {
delete info->type_support_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
delete info;
}

Expand All @@ -165,6 +176,15 @@ rmw_create_subscription(
return nullptr;
}

rmw_ret_t
rmw_subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_count_matched_publishers(
subscription, publisher_count);
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,57 @@
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_

#include <mutex>
#include <set>

#include "fastrtps/publisher/Publisher.h"
#include "fastrtps/publisher/PublisherListener.h"

#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class PubListener;

typedef struct CustomPublisherInfo
{
eprosima::fastrtps::Publisher * publisher_;
PubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
rmw_gid_t publisher_gid;
const char * typesupport_identifier_;
} CustomPublisherInfo;

class PubListener : public eprosima::fastrtps::PublisherListener
{
public:
explicit PubListener(CustomPublisherInfo * info)
{
(void) info;
}

void
onPublicationMatched(
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info)
{
(void) pub;
std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
subscriptions_.insert(info.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) {
subscriptions_.erase(info.remoteEndpointGuid);
}
}

size_t subscriptionCount()
{
std::lock_guard<std::mutex> lock(internalMutex_);
return subscriptions_.size();
}

private:
std::mutex internalMutex_;
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <set>
#include <utility>

#include "fastrtps/subscriber/Subscriber.h"
Expand Down Expand Up @@ -51,7 +52,13 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info)
{
(void)sub;
(void)info;

std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
publishers_.insert(info.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) {
publishers_.erase(info.remoteEndpointGuid);
}
}

void
Expand Down Expand Up @@ -107,11 +114,19 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}
}

size_t publisherCount()
{
std::lock_guard<std::mutex> lock(internalMutex_);
return publishers_.size();
}

private:
std::mutex internalMutex_;
std::atomic_size_t data_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ __rmw_destroy_publisher(
rmw_node_t * node,
rmw_publisher_t * publisher);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_send_request(
Expand Down Expand Up @@ -193,6 +199,12 @@ __rmw_destroy_subscription(
rmw_node_t * node,
rmw_subscription_t * subscription);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_take(
Expand Down
Loading

0 comments on commit 6c99127

Please sign in to comment.