Skip to content

Commit

Permalink
Add topic cache object for managing topic relations
Browse files Browse the repository at this point in the history
* Add topic cache object to maintain participant relationship
* Separate responsibility of maintaining cached data from listener
* Use lock_guard over manual mutex lock
* Use c++14 functions to reduce code
* Add debug statements
* Add lookup of topics for input node
* Remove unnecessary logging
* Add namespace to graph functions
  • Loading branch information
ross-desmond authored and jacobperron committed Dec 6, 2018
1 parent 3577298 commit 222c6e3
Show file tree
Hide file tree
Showing 12 changed files with 964 additions and 59 deletions.
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ add_library(rmw_fastrtps_cpp
src/rmw_guard_condition.cpp
src/rmw_init.cpp
src/rmw_node.cpp
src/rmw_node_info_and_types.cpp
src/rmw_node_names.cpp
src/rmw_publish.cpp
src/rmw_publisher.cpp
Expand Down
68 changes: 68 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_node_info_and_types.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/allocators.h"
#include "rmw/convert_rcutils_ret_to_rmw_ret.h"
#include "rmw/error_handling.h"
#include "rmw/get_node_info_and_types.h"
#include "rmw/names_and_types.h"
#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"

#include "rmw_fastrtps_cpp/identifier.hpp"

extern "C"
{
rmw_ret_t
rmw_get_subscriber_names_and_types_by_node(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
return rmw_fastrtps_shared_cpp::__rmw_get_subscriber_names_and_types_by_node(
eprosima_fastrtps_identifier, node, allocator, node_name, node_namespace, no_demangle,
topic_names_and_types);
}

rmw_ret_t
rmw_get_publisher_names_and_types_by_node(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
return rmw_fastrtps_shared_cpp::__rmw_get_publisher_names_and_types_by_node(
eprosima_fastrtps_identifier, node, allocator, node_name, node_namespace, no_demangle,
topic_names_and_types);
}

rmw_ret_t
rmw_get_service_names_and_types_by_node(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
rmw_names_and_types_t * service_names_and_types)
{
return rmw_fastrtps_shared_cpp::__rmw_get_service_names_and_types_by_node(
eprosima_fastrtps_identifier, node, allocator, node_name, node_namespace,
service_names_and_types);
}
} // extern "C"
1 change: 1 addition & 0 deletions rmw_fastrtps_dynamic_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_library(rmw_fastrtps_dynamic_cpp
src/rmw_guard_condition.cpp
src/rmw_init.cpp
src/rmw_node.cpp
src/rmw_node_info_and_types.cpp
src/rmw_node_names.cpp
src/rmw_publish.cpp
src/rmw_publisher.cpp
Expand Down
83 changes: 83 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_node_info_and_types.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/allocators.h"
#include "rmw/convert_rcutils_ret_to_rmw_ret.h"
#include "rmw/error_handling.h"
#include "rmw/get_node_info_and_types.h"
#include "rmw/names_and_types.h"
#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"

#include "rmw_fastrtps_dynamic_cpp/identifier.hpp"

// The extern "C" here enforces that overloading is not used.
extern "C"
{
rmw_ret_t
rmw_get_subscriber_names_and_types_by_node(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
return rmw_fastrtps_shared_cpp::__rmw_get_subscriber_names_and_types_by_node(
eprosima_fastrtps_identifier,
node,
allocator,
node_name,
node_namespace,
no_demangle,
topic_names_and_types);
}

rmw_ret_t
rmw_get_publisher_names_and_types_by_node(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
return rmw_fastrtps_shared_cpp::__rmw_get_publisher_names_and_types_by_node(
eprosima_fastrtps_identifier,
node,
allocator,
node_name,
node_namespace,
no_demangle,
topic_names_and_types);
}

rmw_ret_t
rmw_get_service_names_and_types_by_node(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
rmw_names_and_types_t * service_names_and_types)
{
return rmw_fastrtps_shared_cpp::__rmw_get_service_names_and_types_by_node(
eprosima_fastrtps_identifier,
node,
allocator,
node_name,
node_namespace,
service_names_and_types);
}
} // extern "C"
5 changes: 4 additions & 1 deletion rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ add_library(rmw_fastrtps_shared_cpp
src/rmw_guard_condition.cpp
src/rmw_logging.cpp
src/rmw_node.cpp
src/rmw_node_info_and_types.cpp
src/rmw_node_names.cpp
src/rmw_publish.cpp
src/rmw_publisher.cpp
Expand All @@ -68,8 +69,10 @@ add_library(rmw_fastrtps_shared_cpp
src/rmw_wait_set.cpp
src/TypeSupport_impl.cpp
)

target_link_libraries(rmw_fastrtps_shared_cpp
fastcdr fastrtps)
fastcdr fastrtps
)

# specific order: dependents before dependencies
ament_target_dependencies(rmw_fastrtps_shared_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@

#include "rmw_common.hpp"

#include "topic_cache.hpp"

class ParticipantListener;
class ReaderInfo;
class WriterInfo;

typedef struct CustomParticipantInfo
{
Expand Down Expand Up @@ -158,33 +158,21 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener
template<class T>
void process_discovery_info(T & proxyData, bool is_alive, bool is_reader)
{
std::map<std::string, std::vector<std::string>> & topicNtypes =
is_reader ? reader_topic_and_types : writer_topic_and_types;
auto & topic_cache =
is_reader ? reader_topic_cache : writer_topic_cache;

auto fqdn = proxyData.topicName();
bool trigger = false;
mapmutex.lock();
if (is_alive) {
topicNtypes[fqdn].push_back(proxyData.typeName());
trigger = true;
} else {
auto it = topicNtypes.find(fqdn);
if (it != topicNtypes.end()) {
const auto & loc =
std::find(std::begin(it->second), std::end(it->second), proxyData.typeName());
if (loc != std::end(it->second)) {
topicNtypes[fqdn].erase(loc, loc + 1);
trigger = true;
} else {
RCUTILS_LOG_DEBUG_NAMED(
"rmw_fastrtps_shared_cpp",
"unexpected removal of subscription on topic '%s' with type '%s'",
fqdn.c_str(), proxyData.typeName().c_str());
}
bool trigger;
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
if (is_alive) {
trigger = topic_cache.addTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName(), proxyData.typeName());
} else {
trigger = topic_cache.removeTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName(), proxyData.typeName());
}
}
mapmutex.unlock();

if (trigger) {
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
graph_guard_condition_->implementation_identifier,
Expand All @@ -194,9 +182,8 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener

std::map<eprosima::fastrtps::rtps::GUID_t, std::string> discovered_names;
std::map<eprosima::fastrtps::rtps::GUID_t, std::string> discovered_namespaces;
std::map<std::string, std::vector<std::string>> reader_topic_and_types;
std::map<std::string, std::vector<std::string>> writer_topic_and_types;
std::mutex mapmutex;
LockedObject<TopicCache> reader_topic_cache;
LockedObject<TopicCache> writer_topic_cache;
rmw_guard_condition_t * graph_guard_condition_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,38 @@ __rmw_get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_get_publisher_names_and_types_by_node(
const char * identifier,
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_get_service_names_and_types_by_node(
const char * identifier,
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
rmw_names_and_types_t * service_names_and_types);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_get_subscriber_names_and_types_by_node(
const char * identifier,
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * node_name,
const char * node_namespace,
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_service_server_is_available(
Expand Down
Loading

0 comments on commit 222c6e3

Please sign in to comment.