Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make service/client construction/destruction implementation compliant #445

Merged
merged 5 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 136 additions & 108 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

#include <string>

#include "rcpputils/scope_exit.hpp"
#include "rcutils/logging_macros.h"

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/impl/cpp/macros.hpp"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rmw_fastrtps_shared_cpp/custom_client_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
Expand All @@ -43,39 +47,36 @@ rmw_create_client(
const rosidl_service_type_support_t * type_supports,
const char * service_name, const rmw_qos_profile_t * qos_policies)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr);
if (0 == strlen(service_name)) {
RMW_SET_ERROR_MSG("service_name argument is an empty string");
return nullptr;
}

if (node->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return nullptr;
}

if (!service_name || strlen(service_name) == 0) {
RMW_SET_ERROR_MSG("client topic is null or empty string");
return nullptr;
}

if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_profile is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service_name argument is invalid: %s", reason);
return nullptr;
}
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto participant_info =
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
if (!participant_info) {
RMW_SET_ERROR_MSG("participant info is null");
return nullptr;
}

Participant * participant = participant_info->participant;
if (!participant) {
RMW_SET_ERROR_MSG("participant handle is null");
return nullptr;
}

const rosidl_service_type_support_t * type_support = get_service_typesupport_handle(
type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
Expand All @@ -88,12 +89,21 @@ rmw_create_client(
}
}

CustomClientInfo * info = nullptr;
eprosima::fastrtps::SubscriberAttributes subscriberParam;
eprosima::fastrtps::PublisherAttributes publisherParam;
rmw_client_t * rmw_client = nullptr;

info = new CustomClientInfo();
CustomClientInfo * info = new (std::nothrow) CustomClientInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate client info");
return nullptr;
}
auto cleanup_base_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->request_type_support_) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->request_type_support_);
}
if (info->response_type_support_) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->response_type_support_);
}
delete info;
});
info->participant_ = participant;
info->typesupport_identifier_ = type_support->typesupport_identifier;
info->request_publisher_matched_count_ = 0;
Expand All @@ -120,7 +130,11 @@ rmw_create_client(
participant, request_type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->request_type_support_)))
{
info->request_type_support_ = new RequestTypeSupport_cpp(service_members);
info->request_type_support_ = new (std::nothrow) RequestTypeSupport_cpp(service_members);
Comment on lines 130 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but there's a TOCTTOW race here (getRegisteredType/_register_type race).
I fixed something similar in rmw_connext a bit ago: ros2/rmw_connext#442.
We should open an issue about this one too.

Note: maybe registering the same type twice isn't an issue in rmw_fastrtps, but I bet that's not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, this is interesting. You're spot on about the race, but alas these functions are not expected to be thread-safe to begin with (see ros2/rmw#276 (comment)).

I see much discussion incoming. I'll add it to the backlog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, this is interesting. You're spot on about the race, but alas these functions are not expected to be thread-safe to begin with (see ros2/rmw#276 (comment)).

So, the node-object doesn't have to be thread safe, we're mutexing all the accesses to rcl_node_t in rclcpp/rclpy.
The problem is that the function isn't concurrent at all (the domain participant factory is a singleton), and when you compose different nodes in the same process this function calls can randomly break.

I never have that problem with fastrtps, but that was a problem in connext. Of course, it was almost impossible to reproduce.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the node-object doesn't have to be thread safe, we're mutexing all the accesses to rcl_node_t in rclcpp/rclpy.

I'm surprised we're actually not doing that (in rclcpp at least)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the function isn't concurrent at all (the domain participant factory is a singleton), and when you compose different nodes in the same process this function calls can randomly break.

We actually would need one mutex per context to avoid this race https://github.com/eProsima/Fast-DDS/blob/724fa5741d569eff6d5633b6eb587238df97604d/src/cpp/fastrtps_deprecated/Domain.cpp#L309-L316.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the function isn't concurrent at all (the domain participant factory is a singleton), and when you compose different nodes in the same process this function calls can randomly break.

Hmm I see. And I agree, non-reentrant functions are a problem. I'll open an issue.

if (!info->request_type_support_) {
RMW_SET_ERROR_MSG("failed to allocate request typesupport");
return nullptr;
}
_register_type(participant, info->request_type_support_);
}

Expand All @@ -129,10 +143,17 @@ rmw_create_client(
participant, response_type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->response_type_support_)))
{
info->response_type_support_ = new ResponseTypeSupport_cpp(service_members);
info->response_type_support_ = new (std::nothrow) ResponseTypeSupport_cpp(service_members);
if (!info->response_type_support_) {
RMW_SET_ERROR_MSG("failed to allocate response typesupport");
return nullptr;
}
_register_type(participant, info->response_type_support_);
}

eprosima::fastrtps::SubscriberAttributes subscriberParam;
eprosima::fastrtps::PublisherAttributes publisherParam;

if (!participant_info->leave_middleware_default_qos) {
subscriberParam.historyMemoryPolicy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
Expand Down Expand Up @@ -167,140 +188,147 @@ rmw_create_client(

// Create Client Subscriber and set QoS
if (!get_datareader_qos(*qos_policies, subscriberParam)) {
RMW_SET_ERROR_MSG("failed to get datareader qos");
goto fail;
return nullptr;
}
auto cleanup_response_subscriber = rcpputils::make_scope_exit(
[info]() {
if (info->response_subscriber_) {
if (!Domain::removeSubscriber(info->response_subscriber_)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to remove response subscriber after '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
}
if (info->listener_) {
delete info->listener_;
}
});
info->listener_ = new (std::nothrow) ClientListener(info);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really update the info class to store std::unique_ptr instead of raw pointers.
That's cleaner than the scope_exit trick and it avoids duplicated code with the destruction function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We still have to deal with type erasure to some extent, but it'd make it a lot easier.

I won't pursue that change here though. We can open a separate issue.

if (!info->listener_) {
RMW_SET_ERROR_MSG("failed to create client response subscriber listener");
return nullptr;
}
info->listener_ = new ClientListener(info);
info->response_subscriber_ =
Domain::createSubscriber(participant, subscriberParam, info->listener_);
if (!info->response_subscriber_) {
RMW_SET_ERROR_MSG("create_client() could not create subscriber");
goto fail;
RMW_SET_ERROR_MSG("failed to create client response subscriber");
return nullptr;
}

// Create Client Publisher and set QoS
if (!get_datawriter_qos(*qos_policies, publisherParam)) {
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
return nullptr;
}
auto cleanup_request_publisher = rcpputils::make_scope_exit(
[info]() {
if (info->request_publisher_) {
if (!Domain::removePublisher(info->request_publisher_)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to remove request publisher after '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
}
if (info->pub_listener_) {
delete info->pub_listener_;
}
});
info->pub_listener_ = new (std::nothrow) ClientPubListener(info);
if (!info->pub_listener_) {
RMW_SET_ERROR_MSG("failed to create client request publisher listener");
return nullptr;
}
info->pub_listener_ = new ClientPubListener(info);
info->request_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->request_publisher_) {
RMW_SET_ERROR_MSG("create_client() could not create publisher");
goto fail;
RMW_SET_ERROR_MSG("failed to create client request publisher");
return nullptr;
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
rmw_client_t * rmw_client = rmw_client_allocate();
if (!rmw_client) {
RMW_SET_ERROR_MSG("failed to allocate memory for client");
goto fail;
return nullptr;
}
auto cleanup_rmw_client = rcpputils::make_scope_exit(
[rmw_client]() {
rmw_free(const_cast<char *>(rmw_client->service_name));
rmw_free(rmw_client);
});

rmw_client->implementation_identifier = eprosima_fastrtps_identifier;
rmw_client->data = info;
rmw_client->service_name = reinterpret_cast<const char *>(
rmw_allocate(strlen(service_name) + 1));
if (!rmw_client->service_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for client name");
goto fail;
return nullptr;
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_publisher_->getGuid());
common_context->graph_cache.associate_writer(
gid,
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_subscriber_->getGuid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
goto fail;
}
}

return rmw_client;

fail:
if (info != nullptr) {
if (info->request_publisher_ != nullptr) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_publisher_->getGuid());
common_context->graph_cache.dissociate_writer(
gid,
if (RMW_RET_OK != ret) {
common_context->graph_cache.dissociate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
Domain::removePublisher(info->request_publisher_);
}

if (info->response_subscriber_ != nullptr) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_subscriber_->getGuid());
common_context->graph_cache.dissociate_reader(
gid,
common_context->graph_cache.dissociate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
Domain::removeSubscriber(info->response_subscriber_);
}

if (info->pub_listener_ != nullptr) {
delete info->pub_listener_;
}

if (info->listener_ != nullptr) {
delete info->listener_;
}

if (participant_info) {
if (info->request_type_support_ != nullptr) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->request_type_support_);
}

if (info->response_type_support_ != nullptr) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->response_type_support_);
}
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_fastrtps_cpp",
"leaking type support objects because node impl is null");
}

delete info;
info = nullptr;
}

if (nullptr != rmw_client) {
if (rmw_client->service_name != nullptr) {
rmw_free(const_cast<char *>(rmw_client->service_name));
rmw_client->service_name = nullptr;
return nullptr;
}
rmw_client_free(rmw_client);
}

return nullptr;
cleanup_rmw_client.cancel();
cleanup_response_subscriber.cancel();
cleanup_request_publisher.cancel();
cleanup_base_info.cancel();
return rmw_client;
}

rmw_ret_t
rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client,
client->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

return rmw_fastrtps_shared_cpp::__rmw_destroy_client(
eprosima_fastrtps_identifier, node, client);
}
Expand Down
Loading