Skip to content

Commit

Permalink
Fix communication with asymmetric ignoreParticipantFlags (#3105)
Browse files Browse the repository at this point in the history
* Refs #16253. Regression test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Ignoring participants from other processes when they have no locators.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Avoid announcing locators for intraprocess-only participants.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Set TTL to 0 for intraprocess-only participants.

Signed-off-by: Miguel Company <[email protected]>

* Refs #16253. Avoid SHM transport for intraprocess-only participants.

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Miguel Company <[email protected]>
(cherry picked from commit 865702b)

# Conflicts:
#	src/cpp/rtps/builtin/discovery/participant/PDP.cpp
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
  • Loading branch information
MiguelCompany authored and mergify[bot] committed Nov 24, 2022
1 parent acfd618 commit 114e4c3
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 22 deletions.
44 changes: 38 additions & 6 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,14 @@ ParticipantProxyData* PDP::add_participant_proxy_data(
void PDP::initializeParticipantProxyData(
ParticipantProxyData* participant_data)
{
<<<<<<< HEAD
participant_data->m_leaseDuration = mp_RTPSParticipant->getAttributes().builtin.discovery_config.leaseDuration;
=======
RTPSParticipantAttributes& attributes = mp_RTPSParticipant->getAttributes();
bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();

participant_data->m_leaseDuration = attributes.builtin.discovery_config.leaseDuration;
>>>>>>> 865702b44 (Fix communication with asymmetric ignoreParticipantFlags (#3105))
//set_VendorId_eProsima(participant_data->m_VendorId);
participant_data->m_VendorId = c_VendorId_eProsima;

Expand Down Expand Up @@ -282,13 +289,26 @@ void PDP::initializeParticipantProxyData(
participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints();
#endif // if HAVE_SECURITY

<<<<<<< HEAD
for (const Locator_t& loc : mp_RTPSParticipant->getAttributes().defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : mp_RTPSParticipant->getAttributes().defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
=======
if (announce_locators)
{
for (const Locator_t& loc : attributes.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : attributes.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}
>>>>>>> 865702b44 (Fix communication with asymmetric ignoreParticipantFlags (#3105))
}
participant_data->m_expectsInlineQos = false;
participant_data->m_guid = mp_RTPSParticipant->getGuid();
Expand Down Expand Up @@ -317,21 +337,33 @@ void PDP::initializeParticipantProxyData(
}

participant_data->metatraffic_locators.unicast.clear();
for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList)
if (announce_locators)
{
participant_data->metatraffic_locators.add_unicast_locator(loc);
for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(loc);
}
}

participant_data->metatraffic_locators.multicast.clear();
if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
if (announce_locators)
{
for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList)
if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
participant_data->metatraffic_locators.add_multicast_locator(loc);
for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(loc);
}
}
}

<<<<<<< HEAD
participant_data->m_participantName = std::string(mp_RTPSParticipant->getAttributes().getName());
=======
fastdds::rtps::ExternalLocatorsProcessor::add_external_locators(*participant_data,
attributes.builtin.metatraffic_external_unicast_locators,
attributes.default_external_unicast_locators);
}
>>>>>>> 865702b44 (Fix communication with asymmetric ignoreParticipantFlags (#3105))

participant_data->m_userData = mp_RTPSParticipant->getAttributes().userData;

Expand Down
21 changes: 14 additions & 7 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,22 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData(

// decide if we dismiss the participant using the ParticipantFilteringFlags
const ParticipantFilteringFlags_t& flags = m_discovery.discovery_config.ignoreParticipantFlags;
const GUID_t& remote = participant_data.m_guid;
const GUID_t& local = getLocalParticipantProxyData()->m_guid;
bool is_same_host = local.is_on_same_host_as(remote);
bool is_same_process = local.is_on_same_process_as(remote);

// Discard participants on different process when they don't have metatraffic locators
if (participant_data.metatraffic_locators.multicast.empty() &&
participant_data.metatraffic_locators.unicast.empty() &&
!is_same_process)
{
return nullptr;
}

if (flags != ParticipantFilteringFlags_t::NO_FILTER)
{
const GUID_t& remote = participant_data.m_guid;
const GUID_t& local = getLocalParticipantProxyData()->m_guid;

if (!local.is_on_same_host_as(remote))
if (!is_same_host)
{
if (flags & ParticipantFilteringFlags::FILTER_DIFFERENT_HOST)
{
Expand All @@ -181,9 +190,7 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData(
return nullptr;
}

bool is_same = local.is_on_same_process_as(remote);

if ((filter_same && is_same) || (filter_different && !is_same))
if ((filter_same && is_same_process) || (filter_different && !is_same_process))
{
return nullptr;
}
Expand Down
30 changes: 21 additions & 9 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,30 @@ RTPSParticipantImpl::RTPSParticipantImpl(
UDPv4TransportDescriptor descriptor;
descriptor.sendBufferSize = m_att.sendSocketBufferSize;
descriptor.receiveBufferSize = m_att.listenSocketBufferSize;
<<<<<<< HEAD
m_network_Factory.RegisterTransport(&descriptor);
=======
if (is_intraprocess_only())
{
// Avoid multicast leaving the host for intraprocess-only participants
descriptor.TTL = 0;
}
m_network_Factory.RegisterTransport(&descriptor, &m_att.properties);
>>>>>>> 865702b44 (Fix communication with asymmetric ignoreParticipantFlags (#3105))

#ifdef SHM_TRANSPORT_BUILTIN
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
if (!is_intraprocess_only())
{
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
}
#endif // ifdef SHM_TRANSPORT_BUILTIN
}

Expand Down
68 changes: 68 additions & 0 deletions test/blackbox/common/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,74 @@ TEST_P(Discovery, EndpointCreationMultithreaded)
endpoint_thr.join();
}

// Regression test for redmine issue 16253
TEST_P(Discovery, AsymmeticIgnoreParticipantFlags)
{
if (INTRAPROCESS != GetParam())
{
GTEST_SKIP() << "Only makes sense on INTRAPROCESS";
return;
}

// This participant is created with flags to ignore participants which are not on the same process.
// When the announcements of this participant arrive to p2, a single DATA(p) should be sent back.
// No other traffic is expected, since it will take place through intraprocess.
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
p1.ignore_participant_flags(static_cast<eprosima::fastrtps::rtps::ParticipantFilteringFlags_t>(
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST |
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS));
p1.init();
EXPECT_TRUE(p1.isInitialized());

// This participant is created with the test transport to check that nothing unexpected is sent to the
// multicast metatraffic locators.
// Setting localhost in the interface whitelist ensures that the traffic will not leave the host, and also
// that multicast datagrams are sent only once.
// A very long period for the participant announcement is set, along with 0 initial announcements, so we can
// have a exact expectation on the number of datagrams sent to multicast.
PubSubWriter<HelloWorldPubSubType> p2(TEST_TOPIC_NAME);

// This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
// its value when the first multicast datagram is sent.
std::atomic<uint32_t> multicast_port{ 0 };
// Only two multicast datagrams are allowed: the initial DATA(p) and the DATA(p) sent in response of the discovery
// of p1.
constexpr uint32_t allowed_messages_on_port = 2;

auto test_transport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();

std::atomic<uint32_t> messages_on_port{ 0 };
test_transport->interfaceWhiteList.push_back("127.0.0.1");
test_transport->locator_filter_ = [&multicast_port, &messages_on_port](
const eprosima::fastdds::rtps::Locator& destination)
{
if (IPLocator::isMulticast(destination))
{
uint32_t port = 0;
multicast_port.compare_exchange_strong(port, destination.port);
if (destination.port == multicast_port)
{
++messages_on_port;
}
}
return false;
};

p2.disable_builtin_transport().
add_user_transport_to_pparams(test_transport).
lease_duration({ 60 * 60, 0 }, { 50 * 60, 0 }).
initial_announcements(0, {});
p2.init();
EXPECT_TRUE(p2.isInitialized());

// Wait for participants and endpoints to discover each other
p1.wait_discovery();
p2.wait_discovery();

// Check expectation on the number of multicast datagrams sent by p2
EXPECT_EQ(messages_on_port, allowed_messages_on_port);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 114e4c3

Please sign in to comment.