From 723c5041c1532611fad95a93b3d01ddbdc758e64 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 17 Mar 2021 08:16:55 +0100 Subject: [PATCH 01/16] Added is_plain_ attribute to base TypeSupport. Signed-off-by: Miguel Company --- .../include/rmw_fastrtps_shared_cpp/TypeSupport.hpp | 1 + rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index b31469318..15df939e4 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -85,6 +85,7 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType TypeSupport(); bool max_size_bound_; + bool is_plain_; }; } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp index 1baf926fb..79d756b5f 100644 --- a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp +++ b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp @@ -30,6 +30,7 @@ TypeSupport::TypeSupport() { m_isGetKeyDefined = false; max_size_bound_ = false; + is_plain_ = false; } void TypeSupport::deleteData(void * data) From 81777e72f098cdd68880c77a86c823ec5ee2e7e2 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 17 Mar 2021 09:25:50 +0100 Subject: [PATCH 02/16] Added new methods to base TypeSupport. Signed-off-by: Miguel Company --- .../rmw_fastrtps_shared_cpp/TypeSupport.hpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index 15df939e4..72c905a88 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -77,6 +77,24 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType RMW_FASTRTPS_SHARED_CPP_PUBLIC void deleteData(void * data) override; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + inline bool is_bounded() const +#ifdef TOPIC_DATA_TYPE_API_HAS_IS_BOUNDED + override +#endif + { + return max_size_bound_; + } + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + inline bool is_plain() const +#ifdef TOPIC_DATA_TYPE_API_HAS_IS_PLAIN + override +#endif + { + return is_plain_; + } + RMW_FASTRTPS_SHARED_CPP_PUBLIC virtual ~TypeSupport() {} From 89ccf306400224bd300507f0a2449ff42f4c02b5 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 17 Mar 2021 09:43:16 +0100 Subject: [PATCH 03/16] Updates on rmw_fastrtps_cpp. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/type_support_common.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rmw_fastrtps_cpp/src/type_support_common.cpp b/rmw_fastrtps_cpp/src/type_support_common.cpp index df8cfa101..ef5567641 100644 --- a/rmw_fastrtps_cpp/src/type_support_common.cpp +++ b/rmw_fastrtps_cpp/src/type_support_common.cpp @@ -27,15 +27,22 @@ TypeSupport::TypeSupport() { m_isGetKeyDefined = false; max_size_bound_ = false; + is_plain_ = false; } void TypeSupport::set_members(const message_type_support_callbacks_t * members) { members_ = members; - // Fully bound by default +#ifdef ROSIDL_TYPESUPPORT_FASTRTPS_HAS_PLAIN_TYPES + char bounds_info; + auto data_size = static_cast(members->max_serialized_size(bounds_info)); + max_size_bound_ = 0 != (bounds_info & ROSIDL_TYPESUPPORT_FASTRTPS_BOUNDED_TYPE); + is_plain_ = bounds_info == ROSIDL_TYPESUPPORT_FASTRTPS_PLAIN_TYPE; +#else max_size_bound_ = true; auto data_size = static_cast(members->max_serialized_size(max_size_bound_)); +#endif // A fully bound message of size 0 is an empty message if (max_size_bound_ && (data_size == 0) ) { From 8d399697ce3e0e0b8da4d8dff90eba1f690b87df Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 17 Mar 2021 10:01:39 +0100 Subject: [PATCH 04/16] Updates on rmw_fastrtps_dynamic_cpp. Signed-off-by: Miguel Company --- .../MessageTypeSupport_impl.hpp | 3 ++- .../ServiceTypeSupport_impl.hpp | 6 ++++-- .../rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp | 10 +++++++++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp index 298e2485f..a21e20721 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp @@ -50,8 +50,9 @@ MessageTypeSupport::MessageTypeSupport( ss << "dds_::" << message_name << "_"; this->setName(ss.str().c_str()); - // Fully bound by default + // Fully bound and plain by default this->max_size_bound_ = true; + this->is_plain_ = true; // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp index b14959bfb..467703712 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp @@ -49,8 +49,9 @@ RequestTypeSupport::RequestTypeSupport( ss << "dds_::" << service_name << "_Request_"; this->setName(ss.str().c_str()); - // Fully bound by default + // Fully bound and plain by default this->max_size_bound_ = true; + this->is_plain_ = true; // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { @@ -79,8 +80,9 @@ ResponseTypeSupport::ResponseTypeSupport ss << "dds_::" << service_name << "_Response_"; this->setName(ss.str().c_str()); - // Fully bound by default + // Fully bound and plain by default this->max_size_bound_ = true; + this->is_plain_ = true; // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp index 0f83e0017..836b22214 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp @@ -66,6 +66,7 @@ TypeSupport::TypeSupport(const void * ros_type_support) { m_isGetKeyDefined = false; max_size_bound_ = false; + is_plain_ = false; } // C++ specialization @@ -829,9 +830,15 @@ size_t TypeSupport::calculateMaxSerializedSize( size_t array_size = 1; if (member->is_array_) { array_size = member->array_size_; + + // Whether it is unbounded. + if (0 == array_size) { + this->max_size_bound_ = false; + } + // Whether it is a sequence. if (0 == array_size || member->is_upper_bound_) { - this->max_size_bound_ = false; + this->is_plain_ = false; current_alignment += padding + eprosima::fastcdr::Cdr::alignment(current_alignment, padding); } @@ -866,6 +873,7 @@ size_t TypeSupport::calculateMaxSerializedSize( case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING: { this->max_size_bound_ = false; + this->is_plain_ = false; size_t character_size = (member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING) ? 4 : 1; for (size_t index = 0; index < array_size; ++index) { From f4dde4df5d30b1a1518ddb0b5459814b5b80c9b9 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 17 Mar 2021 10:49:54 +0100 Subject: [PATCH 05/16] Linters. Signed-off-by: Miguel Company --- .../include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp index 836b22214..aa4dc500d 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp @@ -830,7 +830,7 @@ size_t TypeSupport::calculateMaxSerializedSize( size_t array_size = 1; if (member->is_array_) { array_size = member->array_size_; - + // Whether it is unbounded. if (0 == array_size) { this->max_size_bound_ = false; From f7f275cecb1c967919c0fbc8f3cf0f9d3a84af69 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 29 Mar 2021 07:07:33 +0200 Subject: [PATCH 06/16] Apply suggestion Signed-off-by: Miguel Company Co-authored-by: Michel Hidalgo --- .../include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp index aa4dc500d..68d74d26f 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp @@ -832,7 +832,7 @@ size_t TypeSupport::calculateMaxSerializedSize( array_size = member->array_size_; // Whether it is unbounded. - if (0 == array_size) { + if (0u == array_size) { this->max_size_bound_ = false; } From c8ae2688b6515e4c0733bfe35bd4fd0171b82c42 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 9 Apr 2021 08:59:07 +0200 Subject: [PATCH 07/16] Implementation of rmw_publish_loaned_message. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/rmw_publish.cpp | 8 ++--- rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp | 8 ++--- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 8 +++++ rmw_fastrtps_shared_cpp/src/rmw_publish.cpp | 30 +++++++++++++++++++ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_publish.cpp b/rmw_fastrtps_cpp/src/rmw_publish.cpp index f51354ced..9a7331f26 100644 --- a/rmw_fastrtps_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publish.cpp @@ -51,11 +51,7 @@ rmw_publish_loaned_message( void * ros_message, rmw_publisher_allocation_t * allocation) { - (void) publisher; - (void) ros_message; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_publish_loaned_message not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message( + eprosima_fastrtps_identifier, publisher, ros_message, allocation); } } // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp index a9d09e3b2..c3c396577 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp @@ -41,12 +41,8 @@ rmw_publish_loaned_message( void * ros_message, rmw_publisher_allocation_t * allocation) { - (void) publisher; - (void) ros_message; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_publish_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message( + eprosima_fastrtps_identifier, publisher, ros_message, allocation); } rmw_ret_t diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 082e5629a..d88a8970e 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -144,6 +144,14 @@ __rmw_publish_serialized_message( const rmw_serialized_message_t * serialized_message, rmw_publisher_allocation_t * allocation); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_publish_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const void * ros_message, + rmw_publisher_allocation_t * allocation); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_publisher_assert_liveliness( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp index cc1d7e23f..d4695d419 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp @@ -108,4 +108,34 @@ __rmw_publish_serialized_message( return RMW_RET_OK; } +rmw_ret_t +__rmw_publish_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const void * ros_message, + rmw_publisher_allocation_t * allocation) +{ + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_INVALID_ARGUMENT); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); + + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, publisher->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!publisher->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(publisher->data); + if (!info->data_writer_->write(const_cast(ros_message))) { + RMW_SET_ERROR_MSG("cannot publish data"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp From 2b943ec90e4423ff4c515d513ca60f9327c018a0 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 9 Apr 2021 09:15:38 +0200 Subject: [PATCH 08/16] Implementation of rmw_borrow_loaned_message. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/rmw_publisher.cpp | 8 ++--- .../src/rmw_publisher.cpp | 8 ++--- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 8 +++++ rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp | 30 +++++++++++++++++++ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 81ad0c1e7..4d675f8fc 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -169,12 +169,8 @@ rmw_borrow_loaned_message( const rosidl_message_type_support_t * type_support, void ** ros_message) { - (void) publisher; - (void) type_support; - (void) ros_message; - - RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message( + eprosima_fastrtps_identifier, publisher, type_support, ros_message); } rmw_ret_t diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index 596bdded6..1ad36bfd1 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -170,12 +170,8 @@ rmw_borrow_loaned_message( const rosidl_message_type_support_t * type_support, void ** ros_message) { - (void) publisher; - (void) type_support; - (void) ros_message; - - RMW_SET_ERROR_MSG("rmw_borrow_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message( + eprosima_fastrtps_identifier, publisher, type_support, ros_message); } rmw_ret_t diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index d88a8970e..3385a4ad2 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -144,6 +144,14 @@ __rmw_publish_serialized_message( const rmw_serialized_message_t * serialized_message, rmw_publisher_allocation_t * allocation); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_borrow_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const rosidl_message_type_support_t * type_support, + void ** ros_message); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_publish_loaned_message( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 8a1bd6a0d..3d1a7b33d 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -132,4 +132,34 @@ __rmw_publisher_get_actual_qos( return RMW_RET_OK; } + +rmw_ret_t +__rmw_borrow_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const rosidl_message_type_support_t * type_support, + void ** ros_message) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, publisher->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!publisher->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(type_support, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); + if (nullptr != *ros_message) { + return RMW_RET_INVALID_ARGUMENT; + } + + auto info = static_cast(publisher->data); + if (!info->data_writer_->loan_sample(*ros_message)) { + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp From 4582a48ec53ddace9eb7553cabf1c5a843ef1d17 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 9 Apr 2021 09:22:19 +0200 Subject: [PATCH 09/16] Implementation of rmw_return_loaned_message_from_publisher. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/rmw_publisher.cpp | 8 ++---- .../src/rmw_publisher.cpp | 8 ++---- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 7 ++++++ rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp | 25 +++++++++++++++++++ 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 4d675f8fc..6ee0fd154 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -178,12 +178,8 @@ rmw_return_loaned_message_from_publisher( const rmw_publisher_t * publisher, void * loaned_message) { - (void) publisher; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_publisher not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher( + eprosima_fastrtps_identifier, publisher, loaned_message); } rmw_ret_t diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index 1ad36bfd1..1559b42be 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -179,12 +179,8 @@ rmw_return_loaned_message_from_publisher( const rmw_publisher_t * publisher, void * loaned_message) { - (void) publisher; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_publisher is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher( + eprosima_fastrtps_identifier, publisher, loaned_message); } using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 3385a4ad2..8cd255f56 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -152,6 +152,13 @@ __rmw_borrow_loaned_message( const rosidl_message_type_support_t * type_support, void ** ros_message); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_return_loaned_message_from_publisher( + const char * identifier, + const rmw_publisher_t * publisher, + void * loaned_message); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_publish_loaned_message( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 3d1a7b33d..73f6d0f5e 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -162,4 +162,29 @@ __rmw_borrow_loaned_message( return RMW_RET_OK; } + +rmw_ret_t +__rmw_return_loaned_message_from_publisher( + const char * identifier, + const rmw_publisher_t * publisher, + void * loaned_message) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, publisher->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!publisher->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(publisher->data); + if (!info->data_writer_->discard_loan(loaned_message)) { + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp From 0e99e0cb2b2fdd8cddfd7a65850c4580d11799e8 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 9 Apr 2021 09:24:07 +0200 Subject: [PATCH 10/16] Enable loan messages on publishers of plain types. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/publisher.cpp | 2 +- rmw_fastrtps_dynamic_cpp/src/publisher.cpp | 2 +- rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index a4b80fa30..5a71f1700 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -291,7 +291,7 @@ rmw_fastrtps_cpp::create_publisher( rmw_publisher_free(rmw_publisher); }); - rmw_publisher->can_loan_messages = false; + rmw_publisher->can_loan_messages = info->type_support_->is_plain(); rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index e5e51dba7..c8acd3f13 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -306,7 +306,7 @@ rmw_fastrtps_dynamic_cpp::create_publisher( rmw_publisher_free(rmw_publisher); }); - rmw_publisher->can_loan_messages = false; + rmw_publisher->can_loan_messages = info->type_support_->is_plain(); rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp index cc9eead5d..43c3b0ac4 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp @@ -21,6 +21,8 @@ TypeSupportProxy::TypeSupportProxy(rmw_fastrtps_shared_cpp::TypeSupport * inner_ { setName(inner_type->getName()); m_typeSize = inner_type->m_typeSize; + is_plain_ = inner_type->is_plain(); + max_size_bound_ = inner_type->is_bounded(); } size_t TypeSupportProxy::getEstimatedSerializedSize( From c07488a24f127fa7c2cfb6fbb6a9fa7c667ca91d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 12 Apr 2021 10:55:04 +0200 Subject: [PATCH 11/16] Initial work for taking loaned messages. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/rmw_take.cpp | 30 ++++--------- rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp | 31 ++++--------- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 16 +++++++ rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 44 +++++++++++++++++++ 4 files changed, 78 insertions(+), 43 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_take.cpp b/rmw_fastrtps_cpp/src/rmw_take.cpp index e6f39d3c4..797c1c76c 100644 --- a/rmw_fastrtps_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_cpp/src/rmw_take.cpp @@ -91,13 +91,9 @@ rmw_take_loaned_message( bool * taken, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr); } rmw_ret_t @@ -108,14 +104,10 @@ rmw_take_loaned_message_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) message_info; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info); } rmw_ret_t @@ -123,12 +115,8 @@ rmw_return_loaned_message_from_subscription( const rmw_subscription_t * subscription, void * loaned_message) { - (void) subscription; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_subscription not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription( + eprosima_fastrtps_identifier, subscription, loaned_message); } rmw_ret_t diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp index b1780efa4..29e36633a 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp @@ -91,13 +91,9 @@ rmw_take_loaned_message( bool * taken, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_take_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr); } rmw_ret_t @@ -108,15 +104,10 @@ rmw_take_loaned_message_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) message_info; - (void) allocation; - - RMW_SET_ERROR_MSG( - "rmw_take_loaned_message_with_info is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info); } rmw_ret_t @@ -124,12 +115,8 @@ rmw_return_loaned_message_from_subscription( const rmw_subscription_t * subscription, void * loaned_message) { - (void) subscription; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_subscription is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription( + eprosima_fastrtps_identifier, subscription, loaned_message); } rmw_ret_t diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 8cd255f56..7c187b31d 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -330,6 +330,22 @@ __rmw_take_sequence( size_t * taken, rmw_subscription_allocation_t * allocation); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_take_loaned_message_internal( + const char * identifier, + const rmw_subscription_t * subscription, + void ** loaned_message, + bool * taken, + rmw_message_info_t * message_info); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_return_loaned_message_from_subscription( + const char * identifier, + const rmw_subscription_t * subscription, + void * loaned_message); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_take_event( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index f5e442acc..c71e2f336 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -353,4 +353,48 @@ __rmw_take_serialized_message_with_info( return _take_serialized_message( identifier, subscription, serialized_message, taken, message_info, allocation); } + +rmw_ret_t +__rmw_take_loaned_message_internal( + const char * identifier, + const rmw_subscription_t * subscription, + void ** loaned_message, + bool * taken, + rmw_message_info_t * message_info) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, subscription->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!subscription->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + + RMW_SET_ERROR_MSG("Loaning is not implemented"); + return RMW_RET_UNSUPPORTED; +} + +rmw_ret_t +__rmw_return_loaned_message_from_subscription( + const char * identifier, + const rmw_subscription_t * subscription, + void * loaned_message) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, subscription->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!subscription->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); + + RMW_SET_ERROR_MSG("Loaning is not implemented"); + return RMW_RET_UNSUPPORTED; +} } // namespace rmw_fastrtps_shared_cpp From 77b4a76dfd31df76f2a47ad2dcbd96ea7f05749a Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 12 Apr 2021 13:02:00 +0200 Subject: [PATCH 12/16] Implementation for taking loaned messages. Signed-off-by: Miguel Company --- .../custom_subscriber_info.hpp | 7 ++ rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 76 ++++++++++++++++++- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 105a71ad1..f9c261581 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -40,6 +41,11 @@ class SubListener; +namespace rmw_fastrtps_shared_cpp +{ +struct LoanManager; +} // namespace rmw_fastrtps_shared_cpp + struct CustomSubscriberInfo : public CustomEventInfo { virtual ~CustomSubscriberInfo() = default; @@ -50,6 +56,7 @@ struct CustomSubscriberInfo : public CustomEventInfo const void * type_support_impl_{nullptr}; rmw_gid_t subscription_gid_{}; const char * typesupport_identifier_{nullptr}; + std::shared_ptr loan_manager_; RMW_FASTRTPS_SHARED_CPP_PUBLIC EventListenerInterface * diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index c71e2f336..160e84ea4 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -19,6 +19,8 @@ #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastrtps/utils/collections/ResourceLimitedVector.hpp" + #include "fastcdr/Cdr.h" #include "fastcdr/FastBuffer.h" @@ -354,6 +356,31 @@ __rmw_take_serialized_message_with_info( identifier, subscription, serialized_message, taken, message_info, allocation); } +// ----------------- Loans related code ------------------------- // + +struct GenericSequence : public eprosima::fastdds::dds::LoanableCollection +{ + GenericSequence() = default; + + void resize(size_type new_length) override + { + // This kind of collection should only be used with loans + throw std::bad_alloc(); + } +}; + +struct LoanManager +{ + struct Item + { + GenericSequence data_seq{}; + eprosima::fastdds::dds::SampleInfoSeq info_seq{}; + }; + + std::mutex mtx; + eprosima::fastrtps::ResourceLimitedVector items RCPPUTILS_TSA_GUARDED_BY(mtx); +}; + rmw_ret_t __rmw_take_loaned_message_internal( const char * identifier, @@ -374,8 +401,35 @@ __rmw_take_loaned_message_internal( RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); - RMW_SET_ERROR_MSG("Loaning is not implemented"); - return RMW_RET_UNSUPPORTED; + auto info = static_cast(subscription->data); + auto loan_mgr = info->loan_manager_; + std::unique_lock guard(loan_mgr->mtx); + auto item = loan_mgr->items.emplace_back(); + if (nullptr == item) { + RMW_SET_ERROR_MSG("Out of resources for loaned message info"); + return RMW_RET_ERROR; + } + + while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(item->data_seq, item->info_seq, 1)) { + if (item->info_seq[0].valid_data) { + if (nullptr != message_info) { + _assign_message_info(identifier, message_info, &item->info_seq[0]); + } + *loaned_message = item->data_seq.buffer()[0]; + *taken = true; + info->listener_->update_has_data(info->data_reader_); + return RMW_RET_OK; + } + + // Should return loan before taking again + info->data_reader_->return_loan(item->data_seq, item->info_seq); + } + + // No data available, return loan information. + loan_mgr->items.pop_back(); + *taken = false; + info->listener_->update_has_data(info->data_reader_); + return RMW_RET_OK; } rmw_ret_t @@ -394,7 +448,21 @@ __rmw_return_loaned_message_from_subscription( } RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); - RMW_SET_ERROR_MSG("Loaning is not implemented"); - return RMW_RET_UNSUPPORTED; + auto info = static_cast(subscription->data); + auto loan_mgr = info->loan_manager_; + std::lock_guard guard(loan_mgr->mtx); + for (auto it = loan_mgr->items.begin(); it != loan_mgr->items.end(); ++it) { + if (loaned_message == it->data_seq.buffer()[0]) { + if (!info->data_reader_->return_loan(it->data_seq, it->info_seq)) { + RMW_SET_ERROR_MSG("Error returning loan"); + return RMW_RET_ERROR; + } + loan_mgr->items.erase(it); + return RMW_RET_OK; + } + } + + RMW_SET_ERROR_MSG("Trying to return message not loaned by this subscription"); + return RMW_RET_ERROR; } } // namespace rmw_fastrtps_shared_cpp From eb4771b66c5d89a81a29ed6013566775d5b2d8ca Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 12 Apr 2021 13:36:21 +0200 Subject: [PATCH 13/16] Enable loan messages on subscriptions of plain types. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/subscription.cpp | 3 ++- rmw_fastrtps_dynamic_cpp/src/subscription.cpp | 3 ++- .../rmw_fastrtps_shared_cpp/subscription.hpp | 5 +++++ rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 21 +++++++++++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 4affb8ec1..7964d85d6 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -43,6 +43,7 @@ #include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp" #include "rmw_fastrtps_shared_cpp/qos.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/subscription.hpp" #include "rmw_fastrtps_shared_cpp/utils.hpp" #include "rmw_fastrtps_cpp/identifier.hpp" @@ -324,7 +325,7 @@ create_subscription( return nullptr; } rmw_subscription->options = *subscription_options; - rmw_subscription->can_loan_messages = false; + rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription); topic.should_be_deleted = false; cleanup_rmw_subscription.cancel(); diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 86301ff17..1624a6452 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -40,6 +40,7 @@ #include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp" #include "rmw_fastrtps_shared_cpp/qos.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/subscription.hpp" #include "rmw_fastrtps_shared_cpp/utils.hpp" #include "fastrtps/participant/Participant.h" @@ -338,7 +339,7 @@ create_subscription( memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); rmw_subscription->options = *subscription_options; - rmw_subscription->can_loan_messages = false; + rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription); topic.should_be_deleted = false; cleanup_rmw_subscription.cancel(); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp index 523b916b3..1a3394b47 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp @@ -22,6 +22,11 @@ namespace rmw_fastrtps_shared_cpp { +RMW_FASTRTPS_SHARED_CPP_PUBLIC +void +__init_subscription_for_loans( + rmw_subscription_t * subscription); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t destroy_subscription( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index 160e84ea4..6b75d2bb4 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rmw/allocators.h" #include "rmw/error_handling.h" #include "rmw/serialized_message.h" @@ -27,6 +29,7 @@ #include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp" #include "rmw_fastrtps_shared_cpp/guid_utils.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/subscription.hpp" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw_fastrtps_shared_cpp/utils.hpp" @@ -377,10 +380,28 @@ struct LoanManager eprosima::fastdds::dds::SampleInfoSeq info_seq{}; }; + explicit LoanManager(const eprosima::fastrtps::ResourceLimitedContainerConfig & items_cfg) + : items(items_cfg) + { + } + std::mutex mtx; eprosima::fastrtps::ResourceLimitedVector items RCPPUTILS_TSA_GUARDED_BY(mtx); }; +void +__init_subscription_for_loans( + rmw_subscription_t * subscription) +{ + auto info = static_cast(subscription->data); + subscription->can_loan_messages = info->type_support_->is_plain(); + if (subscription->can_loan_messages) { + const auto & qos = info->data_reader_->get_qos(); + const auto & allocation_qos = qos.reader_resource_limits().outstanding_reads_allocation; + info->loan_manager_ = std::make_shared(allocation_qos); + } +} + rmw_ret_t __rmw_take_loaned_message_internal( const char * identifier, From 65e6846d7b18626e7debb18b60786acf79dcf193 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 12 Apr 2021 18:41:30 +0200 Subject: [PATCH 14/16] Fixed warnings. Signed-off-by: Miguel Company --- rmw_fastrtps_shared_cpp/src/rmw_publish.cpp | 1 + rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp index d4695d419..a6e07a7c7 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp @@ -115,6 +115,7 @@ __rmw_publish_loaned_message( const void * ros_message, rmw_publisher_allocation_t * allocation) { + static_cast(allocation); RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_INVALID_ARGUMENT); RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index 6b75d2bb4..e9cd4be1c 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -365,7 +365,7 @@ struct GenericSequence : public eprosima::fastdds::dds::LoanableCollection { GenericSequence() = default; - void resize(size_type new_length) override + void resize(size_type /*new_length*/) override { // This kind of collection should only be used with loans throw std::bad_alloc(); From 4aa6327166c8457c93af6293a52839760e28048d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 22 Apr 2021 13:01:05 +0200 Subject: [PATCH 15/16] DataSharing off by default Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/publisher.cpp | 2 ++ rmw_fastrtps_cpp/src/rmw_client.cpp | 4 ++++ rmw_fastrtps_cpp/src/rmw_service.cpp | 4 ++++ rmw_fastrtps_cpp/src/subscription.cpp | 2 ++ rmw_fastrtps_dynamic_cpp/src/publisher.cpp | 2 ++ rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp | 4 ++++ rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp | 4 ++++ rmw_fastrtps_dynamic_cpp/src/subscription.cpp | 2 ++ 8 files changed, 24 insertions(+) diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index 5a71f1700..593c4965b 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -248,6 +248,8 @@ rmw_fastrtps_cpp::create_publisher( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } // Get QoS from RMW diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 171fbdc50..d10a61f41 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -311,6 +311,8 @@ rmw_create_client( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -359,6 +361,8 @@ rmw_create_client( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 2345b71a6..7ac758fa6 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -310,6 +310,8 @@ rmw_create_service( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -362,6 +364,8 @@ rmw_create_service( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 7964d85d6..557ad0549 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -243,6 +243,8 @@ create_subscription( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index c8acd3f13..7aa419060 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -263,6 +263,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } // Get QoS from RMW diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 696dab61b..0f8a82391 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -342,6 +342,8 @@ rmw_create_client( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -390,6 +392,8 @@ rmw_create_client( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index bb0f7b567..214d4e1a5 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -341,6 +341,8 @@ rmw_create_service( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -393,6 +395,8 @@ rmw_create_service( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 1624a6452..4d2415220 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -257,6 +257,8 @@ create_subscription( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { From a97fd3e6a5c5e50fb8e5a2f8ea26dc7ac4cb2403 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 22 Apr 2021 13:08:57 +0200 Subject: [PATCH 16/16] Only loan messages when data sharing is enabled. Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/publisher.cpp | 5 ++++- rmw_fastrtps_dynamic_cpp/src/publisher.cpp | 4 +++- rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 7 +++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index 593c4965b..db401b761 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -49,6 +49,8 @@ #include "type_support_common.hpp" +using DataSharingKind = eprosima::fastdds::dds::DataSharingKind; + rmw_publisher_t * rmw_fastrtps_cpp::create_publisher( const CustomParticipantInfo * participant_info, @@ -293,7 +295,8 @@ rmw_fastrtps_cpp::create_publisher( rmw_publisher_free(rmw_publisher); }); - rmw_publisher->can_loan_messages = info->type_support_->is_plain(); + bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind(); + rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain(); rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index 7aa419060..6235cf776 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -49,6 +49,7 @@ #include "type_support_common.hpp" #include "type_support_registry.hpp" +using DataSharingKind = eprosima::fastdds::dds::DataSharingKind; using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy; rmw_publisher_t * @@ -308,7 +309,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher( rmw_publisher_free(rmw_publisher); }); - rmw_publisher->can_loan_messages = info->type_support_->is_plain(); + bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind(); + rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain(); rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index e9cd4be1c..485c377e6 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -36,6 +36,8 @@ namespace rmw_fastrtps_shared_cpp { +using DataSharingKind = eprosima::fastdds::dds::DataSharingKind; + void _assign_message_info( const char * identifier, @@ -394,9 +396,10 @@ __init_subscription_for_loans( rmw_subscription_t * subscription) { auto info = static_cast(subscription->data); - subscription->can_loan_messages = info->type_support_->is_plain(); + const auto & qos = info->data_reader_->get_qos(); + bool has_data_sharing = DataSharingKind::OFF != qos.data_sharing().kind(); + subscription->can_loan_messages = has_data_sharing && info->type_support_->is_plain(); if (subscription->can_loan_messages) { - const auto & qos = info->data_reader_->get_qos(); const auto & allocation_qos = qos.reader_resource_limits().outstanding_reads_allocation; info->loan_manager_ = std::make_shared(allocation_qos); }