Skip to content

Commit

Permalink
Feature/available capacity of ipm (ros2#2173)
Browse files Browse the repository at this point in the history
* added available_capacity to get the lowest number of free capacity for intra-process communication for a publisher

Signed-off-by: Joshua Hampp <[email protected]>

* added unit tests for available_capacity

Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>

* fixed typos in comments

Signed-off-by: Joshua Hampp <[email protected]>

* Updated warning

Co-authored-by: Alberto Soragna <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>

* returning 0 if ipm is disabled in lowest_available_ipm_capacity

Signed-off-by: Joshua Hampp <[email protected]>

* return 0 if no subscribers are present in lowest_available_capacity

Signed-off-by: Joshua Hampp <[email protected]>

* updated unit test

Signed-off-by: Joshua Hampp <[email protected]>

* update unit test

Signed-off-by: Joshua Hampp <[email protected]>

* moved available_capacity to a lambda function to be able to handle subscriptions which went out of scope

Signed-off-by: Joshua Hampp <[email protected]>

* updated unit test to check subscriptions which went out of scope

Signed-off-by: Joshua Hampp <[email protected]>

---------

Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>
Co-authored-by: Joshua Hampp <[email protected]>
Co-authored-by: Joshua Hampp <[email protected]>
Co-authored-by: Alberto Soragna <[email protected]>
(cherry picked from commit fce021b)
  • Loading branch information
DensoADAS authored and jplapp committed May 17, 2024
1 parent d599f9e commit 61b383a
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BufferImplementationBase

virtual void clear() = 0;
virtual bool has_data() const = 0;
virtual size_t available_capacity() const = 0;
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IntraProcessBufferBase

virtual bool has_data() const = 0;
virtual bool use_take_shared_method() const = 0;
virtual size_t available_capacity() const = 0;
};

template<
Expand Down Expand Up @@ -143,6 +144,11 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
return std::is_same<BufferT, MessageSharedPtr>::value;
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

private:
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return is_full_();
}

/// Get the remaining capacity to store messages
/**
* This member function is thread-safe.
*
* \return the number of free capacity for new messages
*/
size_t available_capacity() const
{
std::lock_guard<std::mutex> lock(mutex_);
return available_capacity_();
}

void clear()
{
TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
Expand Down Expand Up @@ -189,6 +201,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return size_ == capacity_;
}

/// Get the remaining capacity to store messages
/**
* This member function is not thread-safe.
*
* \return the number of free capacity for new messages
*/
inline size_t available_capacity_() const
{
return capacity_ - size_;
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ class IntraProcessManager
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
get_subscription_intra_process(uint64_t intra_process_subscription_id);

/// Return the lowest available capacity for all subscription buffers for a publisher id.
RCLCPP_PUBLIC
size_t
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;

private:
struct SplittedSubscriptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

RCLCPP_PUBLIC
virtual
size_t
available_capacity() const = 0;

bool
is_ready(rcl_wait_set_t * wait_set) override = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff
return buffer_->use_take_shared_method();
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

protected:
void
trigger_guard_condition() override
Expand Down
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

/// Return the lowest available capacity for all subscription buffers.
/**
* For intraprocess communication return the lowest buffer capacity for all subscriptions.
* If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t.
* On failure return 0.
* \return lowest buffer capacity for all subscriptions
*/
RCLCPP_PUBLIC
size_t
lowest_available_ipm_capacity() const;

/// Wait until all published messages are acknowledged or until the specified timeout elapses.
/**
* This method waits until all published messages are acknowledged by all matching
Expand Down
47 changes: 47 additions & 0 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,52 @@ IntraProcessManager::can_communicate(
return true;
}

size_t
IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
{
size_t capacity = std::numeric_limits<size_t>::max();

auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling lowest_available_capacity for invalid or no longer existing publisher id");
return 0u;
}

if (publisher_it->second.take_shared_subscriptions.empty() &&
publisher_it->second.take_ownership_subscriptions.empty())
{
// no subscriptions available
return 0u;
}

auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
{
auto subscription_it = subscriptions_.find(intra_process_subscription_id);
if (subscription_it != subscriptions_.end()) {
auto subscription = subscription_it->second.lock();
if (subscription) {
capacity = std::min(capacity, subscription->available_capacity());
}
} else {
// Subscription is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling available_capacity for invalid or no longer existing subscription id");
}
};

for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
available_capacity(sub_id);
}

for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
available_capacity(sub_id);
}

return capacity;
}
} // namespace experimental
} // namespace rclcpp
19 changes: 19 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,22 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin

return network_flow_endpoint_vector;
}

size_t PublisherBase::lowest_available_ipm_capacity() const
{
if (!intra_process_is_enabled_) {
return 0u;
}

auto ipm = weak_ipm_.lock();

if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died for a publisher.");
return 0u;
}

return ipm->lowest_available_capacity(intra_process_publisher_id_);
}
72 changes: 72 additions & 0 deletions rclcpp/test/rclcpp/test_intra_process_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) {
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);
}

/*
Check the available buffer capacity while storing and consuming data from an intra-process
buffer.
The initial available buffer capacity should equal the buffer size.
Inserting a message should decrease the available buffer capacity by 1.
Consuming a message should increase the available buffer capacity by 1.
*/
TEST(TestIntraProcessBuffer, available_capacity) {
using MessageT = char;
using Alloc = std::allocator<void>;
using Deleter = std::default_delete<MessageT>;
using SharedMessageT = std::shared_ptr<const MessageT>;
using UniqueMessageT = std::unique_ptr<MessageT, Deleter>;
using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer<
MessageT, Alloc, Deleter, UniqueMessageT>;

constexpr auto history_depth = 5u;

auto buffer_impl =
std::make_unique<rclcpp::experimental::buffers::RingBufferImplementation<UniqueMessageT>>(
history_depth);

UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl));

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());

auto original_unique_msg = std::make_unique<char>('a');
auto original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
auto original_value = *original_unique_msg;

intra_process_buffer.add_unique(std::move(original_unique_msg));

EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());

SharedMessageT popped_shared_msg;
popped_shared_msg = intra_process_buffer.consume_shared();
auto popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_shared_msg.get());

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_shared_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);

original_unique_msg = std::make_unique<char>('b');
original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
original_value = *original_unique_msg;

intra_process_buffer.add_unique(std::move(original_unique_msg));

auto second_unique_msg = std::make_unique<char>('c');
auto second_message_pointer = reinterpret_cast<std::uintptr_t>(second_unique_msg.get());
auto second_value = *second_unique_msg;

intra_process_buffer.add_unique(std::move(second_unique_msg));

EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity());

UniqueMessageT popped_unique_msg;
popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());

EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);

popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(second_value, *popped_unique_msg);
EXPECT_EQ(second_message_pointer, popped_message_pointer);
}
Loading

0 comments on commit 61b383a

Please sign in to comment.