Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loan messages implementation #523

Merged
merged 16 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

#include "type_support_common.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;

rmw_publisher_t *
rmw_fastrtps_cpp::create_publisher(
const CustomParticipantInfo * participant_info,
Expand Down Expand Up @@ -248,6 +250,8 @@ rmw_fastrtps_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -291,7 +295,8 @@ rmw_fastrtps_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -359,6 +361,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}
} // extern "C"
16 changes: 4 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

rmw_ret_t
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -362,6 +364,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
30 changes: 9 additions & 21 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ rmw_take_loaned_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr);
}

rmw_ret_t
Expand All @@ -108,27 +104,19 @@ rmw_take_loaned_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) message_info;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info);
}

rmw_ret_t
rmw_return_loaned_message_from_subscription(
const rmw_subscription_t * subscription,
void * loaned_message)
{
(void) subscription;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_subscription not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription(
eprosima_fastrtps_identifier, subscription, loaned_message);
}

rmw_ret_t
Expand Down
5 changes: 4 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp"
#include "rmw_fastrtps_shared_cpp/qos.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "rmw_fastrtps_shared_cpp/subscription.hpp"
#include "rmw_fastrtps_shared_cpp/utils.hpp"

#include "rmw_fastrtps_cpp/identifier.hpp"
Expand Down Expand Up @@ -242,6 +243,8 @@ create_subscription(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -324,7 +327,7 @@ create_subscription(
return nullptr;
}
rmw_subscription->options = *subscription_options;
rmw_subscription->can_loan_messages = false;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
9 changes: 8 additions & 1 deletion rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@ TypeSupport::TypeSupport()
{
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
{
members_ = members;

// Fully bound by default
#ifdef ROSIDL_TYPESUPPORT_FASTRTPS_HAS_PLAIN_TYPES
char bounds_info;
auto data_size = static_cast<uint32_t>(members->max_serialized_size(bounds_info));
max_size_bound_ = 0 != (bounds_info & ROSIDL_TYPESUPPORT_FASTRTPS_BOUNDED_TYPE);
is_plain_ = bounds_info == ROSIDL_TYPESUPPORT_FASTRTPS_PLAIN_TYPE;
#else
max_size_bound_ = true;
auto data_size = static_cast<uint32_t>(members->max_serialized_size(max_size_bound_));
#endif

// A fully bound message of size 0 is an empty message
if (max_size_bound_ && (data_size == 0) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
ss << "dds_::" << message_name << "_";
this->setName(ss.str().c_str());

// Fully bound by default
// Fully bound and plain by default
this->max_size_bound_ = true;
this->is_plain_ = true;
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
ss << "dds_::" << service_name << "_Request_";
this->setName(ss.str().c_str());

// Fully bound by default
// Fully bound and plain by default
this->max_size_bound_ = true;
this->is_plain_ = true;
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
Expand Down Expand Up @@ -79,8 +80,9 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
ss << "dds_::" << service_name << "_Response_";
this->setName(ss.str().c_str());

// Fully bound by default
// Fully bound and plain by default
this->max_size_bound_ = true;
this->is_plain_ = true;
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TypeSupport<MembersType>::TypeSupport(const void * ros_type_support)
{
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
}

// C++ specialization
Expand Down Expand Up @@ -829,9 +830,15 @@ size_t TypeSupport<MembersType>::calculateMaxSerializedSize(
size_t array_size = 1;
if (member->is_array_) {
array_size = member->array_size_;

// Whether it is unbounded.
if (0u == array_size) {
this->max_size_bound_ = false;
}

// Whether it is a sequence.
if (0 == array_size || member->is_upper_bound_) {
this->max_size_bound_ = false;
this->is_plain_ = false;
current_alignment += padding +
eprosima::fastcdr::Cdr::alignment(current_alignment, padding);
}
Expand Down Expand Up @@ -866,6 +873,7 @@ size_t TypeSupport<MembersType>::calculateMaxSerializedSize(
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING:
{
this->max_size_bound_ = false;
this->is_plain_ = false;
size_t character_size =
(member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING) ? 4 : 1;
for (size_t index = 0; index < array_size; ++index) {
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "type_support_common.hpp"
#include "type_support_registry.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;
using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;

rmw_publisher_t *
Expand Down Expand Up @@ -263,6 +264,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -306,7 +309,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -390,6 +392,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,8 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}

rmw_ret_t
Expand Down
16 changes: 4 additions & 12 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -393,6 +395,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
Loading