Skip to content

Commit

Permalink
Fix trying to get topic data that was already removed. (#417)
Browse files Browse the repository at this point in the history
* Fix trying to get topic data that was already removed.

Signed-off-by: Chen.Lihui <[email protected]>

* Revert "Fix trying to get topic data that was already removed."

This reverts commit 08632e8.

Signed-off-by: Chen.Lihui <[email protected]>

* Updated based on suggestions

Signed-off-by: Chen.Lihui <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
  • Loading branch information
Chen Lihui authored Aug 6, 2020
1 parent 4d6a45a commit 857d19b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,23 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
// SubscriberListener implementation
void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * /*sub*/, eprosima::fastrtps::rtps::MatchingInfo & info) final
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) final
{
std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
publishers_.insert(info.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) {
publishers_.erase(info.remoteEndpointGuid);
{
std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
publishers_.insert(info.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) {
publishers_.erase(info.remoteEndpointGuid);
}
}
update_unread_count(sub);
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final
{
// Make sure to call into Fast-RTPS before taking the lock to avoid an
// ABBA deadlock between internalMutex_ and mutexes inside of Fast-RTPS.
#if FASTRTPS_VERSION_MAJOR == 1 && FASTRTPS_VERSION_MINOR < 9
uint64_t unread_count = sub->getUnreadCount();
#else
uint64_t unread_count = sub->get_unread_count();
#endif

std::lock_guard<std::mutex> lock(internalMutex_);

// the change to liveliness_lost_count_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

data_.store(unread_count, std::memory_order_relaxed);
update_unread_count(sub);
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
Expand Down Expand Up @@ -142,7 +131,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
}

void
data_taken(eprosima::fastrtps::Subscriber * sub)
update_unread_count(eprosima::fastrtps::Subscriber * sub)
{
// Make sure to call into Fast-RTPS before taking the lock to avoid an
// ABBA deadlock between internalMutex_ and mutexes inside of Fast-RTPS.
Expand All @@ -153,7 +142,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
#endif

std::lock_guard<std::mutex> lock(internalMutex_);
ConditionalScopedLock clock(conditionMutex_);
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);
data_.store(unread_count, std::memory_order_relaxed);
}

Expand Down
4 changes: 2 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ _take(
data.data = ros_message;
data.impl = info->type_support_impl_;
if (info->subscriber_->takeNextData(&data, &sinfo)) {
info->listener_->data_taken(info->subscriber_);
info->listener_->update_unread_count(info->subscriber_);

if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
if (message_info) {
Expand Down Expand Up @@ -267,7 +267,7 @@ _take_serialized_message(
data.data = &buffer;
data.impl = nullptr; // not used when is_cdr_buffer is true
if (info->subscriber_->takeNextData(&data, &sinfo)) {
info->listener_->data_taken(info->subscriber_);
info->listener_->update_unread_count(info->subscriber_);

if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
auto buffer_size = static_cast<size_t>(buffer.getBufferSize());
Expand Down

0 comments on commit 857d19b

Please sign in to comment.