Skip to content

Commit

Permalink
Make service/client construction/destruction implementation compliant (
Browse files Browse the repository at this point in the history
…#445)

Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
hidmic authored and ahcorde committed Oct 15, 2020
1 parent 4fcffe3 commit 7d76937
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 370 deletions.
244 changes: 136 additions & 108 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

#include <string>

#include "rcpputils/scope_exit.hpp"
#include "rcutils/logging_macros.h"

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/impl/cpp/macros.hpp"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rmw_fastrtps_shared_cpp/custom_client_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
Expand All @@ -43,39 +47,36 @@ rmw_create_client(
const rosidl_service_type_support_t * type_supports,
const char * service_name, const rmw_qos_profile_t * qos_policies)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr);
if (0 == strlen(service_name)) {
RMW_SET_ERROR_MSG("service_name argument is an empty string");
return nullptr;
}

if (node->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return nullptr;
}

if (!service_name || strlen(service_name) == 0) {
RMW_SET_ERROR_MSG("client topic is null or empty string");
return nullptr;
}

if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_profile is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service_name argument is invalid: %s", reason);
return nullptr;
}
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto participant_info =
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
if (!participant_info) {
RMW_SET_ERROR_MSG("participant info is null");
return nullptr;
}

Participant * participant = participant_info->participant;
if (!participant) {
RMW_SET_ERROR_MSG("participant handle is null");
return nullptr;
}

const rosidl_service_type_support_t * type_support = get_service_typesupport_handle(
type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
Expand All @@ -88,12 +89,21 @@ rmw_create_client(
}
}

CustomClientInfo * info = nullptr;
eprosima::fastrtps::SubscriberAttributes subscriberParam;
eprosima::fastrtps::PublisherAttributes publisherParam;
rmw_client_t * rmw_client = nullptr;

info = new CustomClientInfo();
CustomClientInfo * info = new (std::nothrow) CustomClientInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate client info");
return nullptr;
}
auto cleanup_base_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->request_type_support_) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->request_type_support_);
}
if (info->response_type_support_) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->response_type_support_);
}
delete info;
});
info->participant_ = participant;
info->typesupport_identifier_ = type_support->typesupport_identifier;
info->request_publisher_matched_count_ = 0;
Expand All @@ -120,7 +130,11 @@ rmw_create_client(
participant, request_type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->request_type_support_)))
{
info->request_type_support_ = new RequestTypeSupport_cpp(service_members);
info->request_type_support_ = new (std::nothrow) RequestTypeSupport_cpp(service_members);
if (!info->request_type_support_) {
RMW_SET_ERROR_MSG("failed to allocate request typesupport");
return nullptr;
}
_register_type(participant, info->request_type_support_);
}

Expand All @@ -129,10 +143,17 @@ rmw_create_client(
participant, response_type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->response_type_support_)))
{
info->response_type_support_ = new ResponseTypeSupport_cpp(service_members);
info->response_type_support_ = new (std::nothrow) ResponseTypeSupport_cpp(service_members);
if (!info->response_type_support_) {
RMW_SET_ERROR_MSG("failed to allocate response typesupport");
return nullptr;
}
_register_type(participant, info->response_type_support_);
}

eprosima::fastrtps::SubscriberAttributes subscriberParam;
eprosima::fastrtps::PublisherAttributes publisherParam;

if (!participant_info->leave_middleware_default_qos) {
subscriberParam.historyMemoryPolicy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
Expand Down Expand Up @@ -167,140 +188,147 @@ rmw_create_client(

// Create Client Subscriber and set QoS
if (!get_datareader_qos(*qos_policies, subscriberParam)) {
RMW_SET_ERROR_MSG("failed to get datareader qos");
goto fail;
return nullptr;
}
auto cleanup_response_subscriber = rcpputils::make_scope_exit(
[info]() {
if (info->response_subscriber_) {
if (!Domain::removeSubscriber(info->response_subscriber_)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to remove response subscriber after '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
}
if (info->listener_) {
delete info->listener_;
}
});
info->listener_ = new (std::nothrow) ClientListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("failed to create client response subscriber listener");
return nullptr;
}
info->listener_ = new ClientListener(info);
info->response_subscriber_ =
Domain::createSubscriber(participant, subscriberParam, info->listener_);
if (!info->response_subscriber_) {
RMW_SET_ERROR_MSG("create_client() could not create subscriber");
goto fail;
RMW_SET_ERROR_MSG("failed to create client response subscriber");
return nullptr;
}

// Create Client Publisher and set QoS
if (!get_datawriter_qos(*qos_policies, publisherParam)) {
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
return nullptr;
}
auto cleanup_request_publisher = rcpputils::make_scope_exit(
[info]() {
if (info->request_publisher_) {
if (!Domain::removePublisher(info->request_publisher_)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to remove request publisher after '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
}
if (info->pub_listener_) {
delete info->pub_listener_;
}
});
info->pub_listener_ = new (std::nothrow) ClientPubListener(info);
if (!info->pub_listener_) {
RMW_SET_ERROR_MSG("failed to create client request publisher listener");
return nullptr;
}
info->pub_listener_ = new ClientPubListener(info);
info->request_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->request_publisher_) {
RMW_SET_ERROR_MSG("create_client() could not create publisher");
goto fail;
RMW_SET_ERROR_MSG("failed to create client request publisher");
return nullptr;
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
rmw_client_t * rmw_client = rmw_client_allocate();
if (!rmw_client) {
RMW_SET_ERROR_MSG("failed to allocate memory for client");
goto fail;
return nullptr;
}
auto cleanup_rmw_client = rcpputils::make_scope_exit(
[rmw_client]() {
rmw_free(const_cast<char *>(rmw_client->service_name));
rmw_free(rmw_client);
});

rmw_client->implementation_identifier = eprosima_fastrtps_identifier;
rmw_client->data = info;
rmw_client->service_name = reinterpret_cast<const char *>(
rmw_allocate(strlen(service_name) + 1));
if (!rmw_client->service_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for client name");
goto fail;
return nullptr;
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_publisher_->getGuid());
common_context->graph_cache.associate_writer(
gid,
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_subscriber_->getGuid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
goto fail;
}
}

return rmw_client;

fail:
if (info != nullptr) {
if (info->request_publisher_ != nullptr) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_publisher_->getGuid());
common_context->graph_cache.dissociate_writer(
gid,
if (RMW_RET_OK != ret) {
common_context->graph_cache.dissociate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
Domain::removePublisher(info->request_publisher_);
}

if (info->response_subscriber_ != nullptr) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_subscriber_->getGuid());
common_context->graph_cache.dissociate_reader(
gid,
common_context->graph_cache.dissociate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
Domain::removeSubscriber(info->response_subscriber_);
}

if (info->pub_listener_ != nullptr) {
delete info->pub_listener_;
}

if (info->listener_ != nullptr) {
delete info->listener_;
}

if (participant_info) {
if (info->request_type_support_ != nullptr) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->request_type_support_);
}

if (info->response_type_support_ != nullptr) {
rmw_fastrtps_shared_cpp::_unregister_type(participant, info->response_type_support_);
}
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_fastrtps_cpp",
"leaking type support objects because node impl is null");
}

delete info;
info = nullptr;
}

if (nullptr != rmw_client) {
if (rmw_client->service_name != nullptr) {
rmw_free(const_cast<char *>(rmw_client->service_name));
rmw_client->service_name = nullptr;
return nullptr;
}
rmw_client_free(rmw_client);
}

return nullptr;
cleanup_rmw_client.cancel();
cleanup_response_subscriber.cancel();
cleanup_request_publisher.cancel();
cleanup_base_info.cancel();
return rmw_client;
}

rmw_ret_t
rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client,
client->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

return rmw_fastrtps_shared_cpp::__rmw_destroy_client(
eprosima_fastrtps_identifier, node, client);
}
Expand Down
Loading

0 comments on commit 7d76937

Please sign in to comment.