Skip to content

Commit

Permalink
Fix communication with asymmetric ignoreParticipantFlags (#3128)
Browse files Browse the repository at this point in the history
* Fix communication with asymmetric ignoreParticipantFlags (#3105)

* Refs #16253. Regression test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Ignoring participants from other processes when they have no locators.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Avoid announcing locators for intraprocess-only participants.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Set TTL to 0 for intraprocess-only participants.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Avoid SHM transport for intraprocess-only participants.

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Miguel Company <[email protected]>
(cherry picked from commit 865702b)

# Conflicts:
#	src/cpp/rtps/builtin/discovery/participant/PDP.cpp

* Fix conflicts

Signed-off-by: Miguel Company <[email protected]>

* Added ignore_participant_flags() to Blackbox_FastRTPS PubSubReader. (#3114)

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Miguel Company <[email protected]>
(cherry picked from commit 6de9842)

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mergify[bot] authored Dec 2, 2022
1 parent a5a9867 commit 5e4ed10
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 29 deletions.
38 changes: 25 additions & 13 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ ParticipantProxyData* PDP::add_participant_proxy_data(
void PDP::initializeParticipantProxyData(
ParticipantProxyData* participant_data)
{
participant_data->m_leaseDuration = mp_RTPSParticipant->getAttributes().builtin.discovery_config.leaseDuration;
RTPSParticipantAttributes& attributes = mp_RTPSParticipant->getAttributes();
bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();

participant_data->m_leaseDuration = attributes.builtin.discovery_config.leaseDuration;
//set_VendorId_eProsima(participant_data->m_VendorId);
participant_data->m_VendorId = c_VendorId_eProsima;

Expand Down Expand Up @@ -278,13 +281,16 @@ void PDP::initializeParticipantProxyData(
participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints();
#endif // if HAVE_SECURITY

for (const Locator_t& loc : mp_RTPSParticipant->getAttributes().defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : mp_RTPSParticipant->getAttributes().defaultMulticastLocatorList)
if (announce_locators)
{
participant_data->default_locators.add_multicast_locator(loc);
for (const Locator_t& loc : attributes.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : attributes.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}
}
participant_data->m_expectsInlineQos = false;
participant_data->m_guid = mp_RTPSParticipant->getGuid();
Expand Down Expand Up @@ -313,21 +319,27 @@ void PDP::initializeParticipantProxyData(
}

participant_data->metatraffic_locators.unicast.clear();
for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList)
if (announce_locators)
{
participant_data->metatraffic_locators.add_unicast_locator(loc);
for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(loc);
}
}

participant_data->metatraffic_locators.multicast.clear();
if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
if (announce_locators)
{
for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList)
if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
participant_data->metatraffic_locators.add_multicast_locator(loc);
for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(loc);
}
}
}

participant_data->m_participantName = std::string(mp_RTPSParticipant->getAttributes().getName());
participant_data->m_participantName = std::string(attributes.getName());

participant_data->m_userData = mp_RTPSParticipant->getAttributes().userData;

Expand Down
21 changes: 14 additions & 7 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,22 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData(

// decide if we dismiss the participant using the ParticipantFilteringFlags
const ParticipantFilteringFlags_t& flags = m_discovery.discovery_config.ignoreParticipantFlags;
const GUID_t& remote = participant_data.m_guid;
const GUID_t& local = getLocalParticipantProxyData()->m_guid;
bool is_same_host = local.is_on_same_host_as(remote);
bool is_same_process = local.is_on_same_process_as(remote);

// Discard participants on different process when they don't have metatraffic locators
if (participant_data.metatraffic_locators.multicast.empty() &&
participant_data.metatraffic_locators.unicast.empty() &&
!is_same_process)
{
return nullptr;
}

if (flags != ParticipantFilteringFlags_t::NO_FILTER)
{
const GUID_t& remote = participant_data.m_guid;
const GUID_t& local = getLocalParticipantProxyData()->m_guid;

if (!local.is_on_same_host_as(remote))
if (!is_same_host)
{
if (flags & ParticipantFilteringFlags::FILTER_DIFFERENT_HOST)
{
Expand All @@ -184,9 +193,7 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData(
return nullptr;
}

bool is_same = local.is_on_same_process_as(remote);

if ((filter_same && is_same) || (filter_different && !is_same))
if ((filter_same && is_same_process) || (filter_different && !is_same_process))
{
return nullptr;
}
Expand Down
26 changes: 17 additions & 9 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,26 @@ RTPSParticipantImpl::RTPSParticipantImpl(
UDPv4TransportDescriptor descriptor;
descriptor.sendBufferSize = m_att.sendSocketBufferSize;
descriptor.receiveBufferSize = m_att.listenSocketBufferSize;
if (is_intraprocess_only())
{
// Avoid multicast leaving the host for intraprocess-only participants
descriptor.TTL = 0;
}
m_network_Factory.RegisterTransport(&descriptor, &m_att.properties);

#ifdef SHM_TRANSPORT_BUILTIN
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
if (!is_intraprocess_only())
{
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
}
#endif // ifdef SHM_TRANSPORT_BUILTIN
}

Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,13 @@ class PubSubReader
return *this;
}

PubSubReader& ignore_participant_flags(
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t flags)
{
participant_attr_.rtps.builtin.discovery_config.ignoreParticipantFlags = flags;
return *this;
}

PubSubReader& socket_buffer_size(
uint32_t sockerBufferSize)
{
Expand Down
68 changes: 68 additions & 0 deletions test/blackbox/common/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,74 @@ TEST_P(Discovery, EndpointCreationMultithreaded)
endpoint_thr.join();
}

// Regression test for redmine issue 16253
TEST_P(Discovery, AsymmeticIgnoreParticipantFlags)
{
if (INTRAPROCESS != GetParam())
{
GTEST_SKIP() << "Only makes sense on INTRAPROCESS";
return;
}

// This participant is created with flags to ignore participants which are not on the same process.
// When the announcements of this participant arrive to p2, a single DATA(p) should be sent back.
// No other traffic is expected, since it will take place through intraprocess.
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
p1.ignore_participant_flags(static_cast<eprosima::fastrtps::rtps::ParticipantFilteringFlags_t>(
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST |
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS));
p1.init();
EXPECT_TRUE(p1.isInitialized());

// This participant is created with the test transport to check that nothing unexpected is sent to the
// multicast metatraffic locators.
// Setting localhost in the interface whitelist ensures that the traffic will not leave the host, and also
// that multicast datagrams are sent only once.
// A very long period for the participant announcement is set, along with 0 initial announcements, so we can
// have a exact expectation on the number of datagrams sent to multicast.
PubSubWriter<HelloWorldPubSubType> p2(TEST_TOPIC_NAME);

// This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
// its value when the first multicast datagram is sent.
std::atomic<uint32_t> multicast_port{ 0 };
// Only two multicast datagrams are allowed: the initial DATA(p) and the DATA(p) sent in response of the discovery
// of p1.
constexpr uint32_t allowed_messages_on_port = 2;

auto test_transport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();

std::atomic<uint32_t> messages_on_port{ 0 };
test_transport->interfaceWhiteList.push_back("127.0.0.1");
test_transport->locator_filter_ = [&multicast_port, &messages_on_port](
const eprosima::fastdds::rtps::Locator& destination)
{
if (IPLocator::isMulticast(destination))
{
uint32_t port = 0;
multicast_port.compare_exchange_strong(port, destination.port);
if (destination.port == multicast_port)
{
++messages_on_port;
}
}
return false;
};

p2.disable_builtin_transport().
add_user_transport_to_pparams(test_transport).
lease_duration({ 60 * 60, 0 }, { 50 * 60, 0 }).
initial_announcements(0, {});
p2.init();
EXPECT_TRUE(p2.isInitialized());

// Wait for participants and endpoints to discover each other
p1.wait_discovery();
p2.wait_discovery();

// Check expectation on the number of multicast datagrams sent by p2
EXPECT_EQ(messages_on_port, allowed_messages_on_port);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 5e4ed10

Please sign in to comment.