diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 18af5831e19..f6778fdd750 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -242,6 +242,7 @@ void PDP::initializeParticipantProxyData( ParticipantProxyData* participant_data) { RTPSParticipantAttributes& attributes = mp_RTPSParticipant->getAttributes(); + bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only(); participant_data->m_leaseDuration = attributes.builtin.discovery_config.leaseDuration; //set_VendorId_eProsima(participant_data->m_VendorId); @@ -282,13 +283,16 @@ void PDP::initializeParticipantProxyData( participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints(); #endif // if HAVE_SECURITY - for (const Locator_t& loc : attributes.defaultUnicastLocatorList) + if (announce_locators) { - participant_data->default_locators.add_unicast_locator(loc); - } - for (const Locator_t& loc : attributes.defaultMulticastLocatorList) - { - participant_data->default_locators.add_multicast_locator(loc); + for (const Locator_t& loc : attributes.defaultUnicastLocatorList) + { + participant_data->default_locators.add_unicast_locator(loc); + } + for (const Locator_t& loc : attributes.defaultMulticastLocatorList) + { + participant_data->default_locators.add_multicast_locator(loc); + } } participant_data->m_expectsInlineQos = false; participant_data->m_guid = mp_RTPSParticipant->getGuid(); @@ -317,23 +321,29 @@ void PDP::initializeParticipantProxyData( } participant_data->metatraffic_locators.unicast.clear(); - for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList) + if (announce_locators) { - participant_data->metatraffic_locators.add_unicast_locator(loc); + for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList) + { + participant_data->metatraffic_locators.add_unicast_locator(loc); + } } participant_data->metatraffic_locators.multicast.clear(); - if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) + if (announce_locators) { - for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList) + if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) { - participant_data->metatraffic_locators.add_multicast_locator(loc); + for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList) + { + participant_data->metatraffic_locators.add_multicast_locator(loc); + } } - } - fastdds::rtps::ExternalLocatorsProcessor::add_external_locators(*participant_data, - attributes.builtin.metatraffic_external_unicast_locators, - attributes.default_external_unicast_locators); + fastdds::rtps::ExternalLocatorsProcessor::add_external_locators(*participant_data, + attributes.builtin.metatraffic_external_unicast_locators, + attributes.default_external_unicast_locators); + } participant_data->m_participantName = std::string(attributes.getName()); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index 924931e8de4..6d5f30d67d2 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -161,13 +161,22 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData( // decide if we dismiss the participant using the ParticipantFilteringFlags const ParticipantFilteringFlags_t& flags = m_discovery.discovery_config.ignoreParticipantFlags; + const GUID_t& remote = participant_data.m_guid; + const GUID_t& local = getLocalParticipantProxyData()->m_guid; + bool is_same_host = local.is_on_same_host_as(remote); + bool is_same_process = local.is_on_same_process_as(remote); + + // Discard participants on different process when they don't have metatraffic locators + if (participant_data.metatraffic_locators.multicast.empty() && + participant_data.metatraffic_locators.unicast.empty() && + !is_same_process) + { + return nullptr; + } if (flags != ParticipantFilteringFlags_t::NO_FILTER) { - const GUID_t& remote = participant_data.m_guid; - const GUID_t& local = getLocalParticipantProxyData()->m_guid; - - if (!local.is_on_same_host_as(remote)) + if (!is_same_host) { if (flags & ParticipantFilteringFlags::FILTER_DIFFERENT_HOST) { @@ -184,9 +193,7 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData( return nullptr; } - bool is_same = local.is_on_same_process_as(remote); - - if ((filter_same && is_same) || (filter_different && !is_same)) + if ((filter_same && is_same_process) || (filter_different && !is_same_process)) { return nullptr; } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 2a879dab1fb..00c1d3e865d 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -153,18 +153,26 @@ RTPSParticipantImpl::RTPSParticipantImpl( UDPv4TransportDescriptor descriptor; descriptor.sendBufferSize = m_att.sendSocketBufferSize; descriptor.receiveBufferSize = m_att.listenSocketBufferSize; + if (is_intraprocess_only()) + { + // Avoid multicast leaving the host for intraprocess-only participants + descriptor.TTL = 0; + } m_network_Factory.RegisterTransport(&descriptor, &m_att.properties); #ifdef SHM_TRANSPORT_BUILTIN - SharedMemTransportDescriptor shm_transport; - // We assume (Linux) UDP doubles the user socket buffer size in kernel, so - // the equivalent segment size in SHM would be socket buffer size x 2 - auto segment_size_udp_equivalent = - std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2; - shm_transport.segment_size(segment_size_udp_equivalent); - // Use same default max_message_size on both UDP and SHM - shm_transport.max_message_size(descriptor.max_message_size()); - has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport); + if (!is_intraprocess_only()) + { + SharedMemTransportDescriptor shm_transport; + // We assume (Linux) UDP doubles the user socket buffer size in kernel, so + // the equivalent segment size in SHM would be socket buffer size x 2 + auto segment_size_udp_equivalent = + std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2; + shm_transport.segment_size(segment_size_udp_equivalent); + // Use same default max_message_size on both UDP and SHM + shm_transport.max_message_size(descriptor.max_message_size()); + has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport); + } #endif // ifdef SHM_TRANSPORT_BUILTIN } diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 539e9c7edaa..f2b6aa32aeb 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -1282,6 +1282,74 @@ TEST_P(Discovery, EndpointCreationMultithreaded) endpoint_thr.join(); } +// Regression test for redmine issue 16253 +TEST_P(Discovery, AsymmeticIgnoreParticipantFlags) +{ + if (INTRAPROCESS != GetParam()) + { + GTEST_SKIP() << "Only makes sense on INTRAPROCESS"; + return; + } + + // This participant is created with flags to ignore participants which are not on the same process. + // When the announcements of this participant arrive to p2, a single DATA(p) should be sent back. + // No other traffic is expected, since it will take place through intraprocess. + PubSubReader p1(TEST_TOPIC_NAME); + p1.ignore_participant_flags(static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS)); + p1.init(); + EXPECT_TRUE(p1.isInitialized()); + + // This participant is created with the test transport to check that nothing unexpected is sent to the + // multicast metatraffic locators. + // Setting localhost in the interface whitelist ensures that the traffic will not leave the host, and also + // that multicast datagrams are sent only once. + // A very long period for the participant announcement is set, along with 0 initial announcements, so we can + // have a exact expectation on the number of datagrams sent to multicast. + PubSubWriter p2(TEST_TOPIC_NAME); + + // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set + // its value when the first multicast datagram is sent. + std::atomic multicast_port{ 0 }; + // Only two multicast datagrams are allowed: the initial DATA(p) and the DATA(p) sent in response of the discovery + // of p1. + constexpr uint32_t allowed_messages_on_port = 2; + + auto test_transport = std::make_shared(); + + std::atomic messages_on_port{ 0 }; + test_transport->interfaceWhiteList.push_back("127.0.0.1"); + test_transport->locator_filter_ = [&multicast_port, &messages_on_port]( + const eprosima::fastdds::rtps::Locator& destination) + { + if (IPLocator::isMulticast(destination)) + { + uint32_t port = 0; + multicast_port.compare_exchange_strong(port, destination.port); + if (destination.port == multicast_port) + { + ++messages_on_port; + } + } + return false; + }; + + p2.disable_builtin_transport(). + add_user_transport_to_pparams(test_transport). + lease_duration({ 60 * 60, 0 }, { 50 * 60, 0 }). + initial_announcements(0, {}); + p2.init(); + EXPECT_TRUE(p2.isInitialized()); + + // Wait for participants and endpoints to discover each other + p1.wait_discovery(); + p2.wait_discovery(); + + // Check expectation on the number of multicast datagrams sent by p2 + EXPECT_EQ(messages_on_port, allowed_messages_on_port); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else