Skip to content

Commit

Permalink
Feature/available capacity of ipm (ros2#2173)
Browse files Browse the repository at this point in the history
* added available_capacity to get the lowest number of free capacity for intra-process communication for a publisher

Signed-off-by: Joshua Hampp <[email protected]>

* added unit tests for available_capacity

Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>

* fixed typos in comments

Signed-off-by: Joshua Hampp <[email protected]>

* Updated warning

Co-authored-by: Alberto Soragna <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>

* returning 0 if ipm is disabled in lowest_available_ipm_capacity

Signed-off-by: Joshua Hampp <[email protected]>

* return 0 if no subscribers are present in lowest_available_capacity

Signed-off-by: Joshua Hampp <[email protected]>

* updated unit test

Signed-off-by: Joshua Hampp <[email protected]>

* update unit test

Signed-off-by: Joshua Hampp <[email protected]>

* moved available_capacity to a lambda function to be able to handle subscriptions which went out of scope

Signed-off-by: Joshua Hampp <[email protected]>

* updated unit test to check subscriptions which went out of scope

Signed-off-by: Joshua Hampp <[email protected]>

---------

Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>
Co-authored-by: Joshua Hampp <[email protected]>
Co-authored-by: Joshua Hampp <[email protected]>
Co-authored-by: Alberto Soragna <[email protected]>
  • Loading branch information
4 people authored and Alexis Pojomovsky committed Jun 26, 2024
1 parent 8b6a033 commit a838046
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class BufferImplementationBase

virtual void clear() = 0;
virtual bool has_data() const = 0;
virtual size_t available_capacity() const = 0;
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class IntraProcessBufferBase

virtual bool has_data() const = 0;
virtual bool use_take_shared_method() const = 0;
virtual size_t available_capacity() const = 0;
};

template<
Expand Down Expand Up @@ -157,6 +158,11 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
return std::is_same<BufferT, MessageSharedPtr>::value;
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

private:
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;

Expand Down Expand Up @@ -348,6 +354,11 @@ class ServiceIntraProcessBuffer : public IntraProcessBufferBase
buffer_->clear();
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

void add(BufferT && msg)
{
buffer_->enqueue(std::move(msg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,18 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return is_full_();
}

/// Get the remaining capacity to store messages
/**
* This member function is thread-safe.
*
* \return the number of free capacity for new messages
*/
size_t available_capacity() const
{
std::lock_guard<std::mutex> lock(mutex_);
return available_capacity_();
}

void clear() override
{
TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
Expand Down Expand Up @@ -266,6 +278,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return {};
}

/// Get the remaining capacity to store messages
/**
* This member function is not thread-safe.
*
* \return the number of free capacity for new messages
*/
inline size_t available_capacity_() const
{
return capacity_ - size_;
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,11 @@ class IntraProcessManager
bool
action_server_is_available(uint64_t ipc_action_client_id);

/// Return the lowest available capacity for all subscription buffers for a publisher id.
RCLCPP_PUBLIC
size_t
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;

private:
struct SplittedSubscriptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
bool
is_durability_transient_local() const;

virtual
size_t
available_capacity() const = 0;

bool
is_ready(rcl_wait_set_t * wait_set) override = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff
return buffer_->use_take_shared_method();
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

protected:
void
trigger_guard_condition() override
Expand Down
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

/// Return the lowest available capacity for all subscription buffers.
/**
* For intraprocess communication return the lowest buffer capacity for all subscriptions.
* If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t.
* On failure return 0.
* \return lowest buffer capacity for all subscriptions
*/
RCLCPP_PUBLIC
size_t
lowest_available_ipm_capacity() const;

/// Wait until all published messages are acknowledged or until the specified timeout elapses.
/**
* This method waits until all published messages are acknowledged by all matching
Expand Down
48 changes: 48 additions & 0 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,5 +571,53 @@ IntraProcessManager::can_communicate(
return true;
}

size_t
IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
{
size_t capacity = std::numeric_limits<size_t>::max();

auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling lowest_available_capacity for invalid or no longer existing publisher id");
return 0u;
}

if (publisher_it->second.take_shared_subscriptions.empty() &&
publisher_it->second.take_ownership_subscriptions.empty())
{
// no subscriptions available
return 0u;
}

auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
{
auto subscription_it = subscriptions_.find(intra_process_subscription_id);
if (subscription_it != subscriptions_.end()) {
auto subscription = subscription_it->second.lock();
if (subscription) {
capacity = std::min(capacity, subscription->available_capacity());
}
} else {
// Subscription is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling available_capacity for invalid or no longer existing subscription id");
}
};

for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
available_capacity(sub_id);
}

for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
available_capacity(sub_id);
}

return capacity;
}

} // namespace experimental
} // namespace rclcpp
19 changes: 19 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,22 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin

return network_flow_endpoint_vector;
}

size_t PublisherBase::lowest_available_ipm_capacity() const
{
if (!intra_process_is_enabled_) {
return 0u;
}

auto ipm = weak_ipm_.lock();

if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died for a publisher.");
return 0u;
}

return ipm->lowest_available_capacity(intra_process_publisher_id_);
}
72 changes: 72 additions & 0 deletions rclcpp/test/rclcpp/test_intra_process_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) {
EXPECT_EQ(original_value_2, *unique_data_vec[1]);
EXPECT_NE(original_message_pointer_2, reinterpret_cast<std::uintptr_t>(unique_data_vec[1].get()));
}

/*
Check the available buffer capacity while storing and consuming data from an intra-process
buffer.
The initial available buffer capacity should equal the buffer size.
Inserting a message should decrease the available buffer capacity by 1.
Consuming a message should increase the available buffer capacity by 1.
*/
TEST(TestIntraProcessBuffer, available_capacity) {
using MessageT = char;
using Alloc = std::allocator<void>;
using Deleter = std::default_delete<MessageT>;
using SharedMessageT = std::shared_ptr<const MessageT>;
using UniqueMessageT = std::unique_ptr<MessageT, Deleter>;
using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer<
MessageT, Alloc, Deleter, UniqueMessageT>;

constexpr auto history_depth = 5u;

auto buffer_impl =
std::make_unique<rclcpp::experimental::buffers::RingBufferImplementation<UniqueMessageT>>(
history_depth);

UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl));

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());

auto original_unique_msg = std::make_unique<char>('a');
auto original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
auto original_value = *original_unique_msg;

intra_process_buffer.add_unique(std::move(original_unique_msg));

EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());

SharedMessageT popped_shared_msg;
popped_shared_msg = intra_process_buffer.consume_shared();
auto popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_shared_msg.get());

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_shared_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);

original_unique_msg = std::make_unique<char>('b');
original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
original_value = *original_unique_msg;

intra_process_buffer.add_unique(std::move(original_unique_msg));

auto second_unique_msg = std::make_unique<char>('c');
auto second_message_pointer = reinterpret_cast<std::uintptr_t>(second_unique_msg.get());
auto second_value = *second_unique_msg;

intra_process_buffer.add_unique(std::move(second_unique_msg));

EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity());

UniqueMessageT popped_unique_msg;
popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());

EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);

popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(second_value, *popped_unique_msg);
EXPECT_EQ(second_message_pointer, popped_message_pointer);
}
82 changes: 82 additions & 0 deletions rclcpp/test/rclcpp/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ class SubscriptionIntraProcessBase
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
}

virtual
size_t
available_capacity() const = 0;

rclcpp::QoS qos_profile;
std::string topic_name;
};
Expand Down Expand Up @@ -350,6 +354,12 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
return take_shared_method;
}

size_t
available_capacity() const override
{
return qos_profile.depth() - buffer->size();
}

bool take_shared_method;

typename rclcpp::experimental::buffers::mock::IntraProcessBuffer<MessageT>::UniquePtr buffer;
Expand Down Expand Up @@ -1033,3 +1043,75 @@ TEST(TestIntraProcessManager, transient_local) {
reinterpret_cast<MessageT *>(received_message_pointer_3)->msg);
ASSERT_EQ("Test", reinterpret_cast<MessageT *>(received_message_pointer_1)->msg);
}

/*
* This tests the method "lowest_available_capacity":
* - Creates 1 publisher.
* - The available buffer capacity should be at least history size.
* - Add 2 subscribers.
* - Add everything to the intra-process manager.
* - All the entities are expected to have different ids.
* - Check the subscriptions count for the publisher.
* - The available buffer capacity should be the history size.
* - Publish one message (without receiving it).
* - The available buffer capacity should decrease by 1.
* - Publish another message (without receiving it).
* - The available buffer capacity should decrease by 1.
* - One subscriber receives one message.
* - The available buffer capacity should stay the same,
* as the other subscriber still has not freed its buffer.
* - The other subscriber receives one message.
* - The available buffer capacity should increase by 1.
* - One subscription goes out of scope.
* - The available buffer capacity should not change.
*/
TEST(TestIntraProcessManager, lowest_available_capacity) {
using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager;
using MessageT = rcl_interfaces::msg::Log;
using PublisherT = rclcpp::mock::Publisher<MessageT>;
using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess<MessageT>;

constexpr auto history_depth = 10u;

auto ipm = std::make_shared<IntraProcessManagerT>();

auto p1 = std::make_shared<PublisherT>(rclcpp::QoS(history_depth).transient_local());

auto s1 =
std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).transient_local());
auto s2 =
std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).transient_local());
auto s3 =
std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).transient_local());

s1->take_shared_method = false;
s2->take_shared_method = true;
s3->take_shared_method = true;

auto p1_id = ipm->add_publisher(p1, p1->buffer);

p1->set_intra_process_manager(p1_id, ipm);

auto unique_msg = std::make_unique<MessageT>();
unique_msg->msg = "Test";
p1->publish(std::move(unique_msg));

ipm->template add_subscription<MessageT>(s1);
ipm->template add_subscription<MessageT>(s2);
ipm->template add_subscription<MessageT>(s3);

auto received_message_pointer_1 = s1->pop();
auto received_message_pointer_2 = s2->pop();
auto received_message_pointer_3 = s3->pop();
ASSERT_NE(0u, received_message_pointer_1);
ASSERT_NE(0u, received_message_pointer_2);
ASSERT_NE(0u, received_message_pointer_3);
ASSERT_EQ(received_message_pointer_3, received_message_pointer_2);
ASSERT_EQ(
reinterpret_cast<MessageT *>(received_message_pointer_1)->msg,
reinterpret_cast<MessageT *>(received_message_pointer_2)->msg);
ASSERT_EQ(
reinterpret_cast<MessageT *>(received_message_pointer_1)->msg,
reinterpret_cast<MessageT *>(received_message_pointer_3)->msg);
ASSERT_EQ("Test", reinterpret_cast<MessageT *>(received_message_pointer_1)->msg);
}
Loading

0 comments on commit a838046

Please sign in to comment.