Skip to content

Commit

Permalink
Unlock reader's mutex before stopping WriterProxy
Browse files Browse the repository at this point in the history
Signed-off-by: Juan López Fernández <[email protected]>
  • Loading branch information
juanlofer-eprosima authored and jparisu committed Dec 7, 2022
1 parent 31927e9 commit 8f7bc26
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
16 changes: 14 additions & 2 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ bool StatefulReader::matched_writer_add(
EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()
<< " to " << this->m_guid.entityId
<< " with data sharing.");
wp->stop();
{
// Release reader's lock to avoid deadlock when waiting for event (requiring mutex) to finish
guard.unlock();
assert(!guard.owns_lock());
wp->stop();
guard.lock();
}
matched_writers_pool_.push_back(wp);
return false;
}
Expand Down Expand Up @@ -389,7 +395,13 @@ bool StatefulReader::matched_writer_remove(
(void)removed_from_listener;
remove_changes_from(writer_guid, true);
}
wproxy->stop();
{
// Release reader's lock to avoid deadlock when waiting for event (requiring mutex) to finish
lock.unlock();
assert(!lock.owns_lock());
wproxy->stop();
lock.lock();
}
matched_writers_pool_.push_back(wproxy);
if (nullptr != mp_listener)
{
Expand Down
16 changes: 14 additions & 2 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ void WriterProxy::stop()
StateCode prev_code;
if ((prev_code = state_.exchange(StateCode::STOPPED)) == StateCode::BUSY)
{
// Initial ack_nack being performed, wait for it to finish
// TimedEvent being performed, wait for it to finish.
// It does not matter which of the two events is the one on execution, but we must wait on initial_acknack_ as
// it could be restarted if only cancelled while its callback is being triggered.
initial_acknack_->recreate_timer();
}
else
Expand Down Expand Up @@ -526,7 +528,17 @@ bool WriterProxy::perform_initial_ack_nack()

void WriterProxy::perform_heartbeat_response()
{
StateCode expected = StateCode::IDLE;
if (!state_.compare_exchange_strong(expected, StateCode::BUSY))
{
// Stopped from another thread -> abort
return;
}

reader_->send_acknack(this, this, heartbeat_final_flag_.load());

expected = StateCode::BUSY;
state_.compare_exchange_strong(expected, StateCode::IDLE);
}

bool WriterProxy::process_heartbeat(
Expand All @@ -544,7 +556,7 @@ bool WriterProxy::process_heartbeat(
#endif // SHOULD_DEBUG_LINUX

assert_liveliness = false;
if (last_heartbeat_count_ < count)
if (state_ != StateCode::STOPPED && last_heartbeat_count_ < count)
{
// If it is the first heartbeat message, we can try to cancel initial ack.
// TODO: This timer cancelling should be checked if needed with the liveliness implementation.
Expand Down

0 comments on commit 8f7bc26

Please sign in to comment.