Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16277] Fix communication with asymmetric ignoreParticipantFlags #3105

Merged
merged 5 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void PDP::initializeParticipantProxyData(
ParticipantProxyData* participant_data)
{
RTPSParticipantAttributes& attributes = mp_RTPSParticipant->getAttributes();
bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();

participant_data->m_leaseDuration = attributes.builtin.discovery_config.leaseDuration;
//set_VendorId_eProsima(participant_data->m_VendorId);
Expand Down Expand Up @@ -282,13 +283,16 @@ void PDP::initializeParticipantProxyData(
participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints();
#endif // if HAVE_SECURITY

for (const Locator_t& loc : attributes.defaultUnicastLocatorList)
if (announce_locators)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : attributes.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
for (const Locator_t& loc : attributes.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : attributes.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}
}
participant_data->m_expectsInlineQos = false;
participant_data->m_guid = mp_RTPSParticipant->getGuid();
Expand Down Expand Up @@ -317,23 +321,29 @@ void PDP::initializeParticipantProxyData(
}

participant_data->metatraffic_locators.unicast.clear();
for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList)
if (announce_locators)
{
participant_data->metatraffic_locators.add_unicast_locator(loc);
for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(loc);
}
}

participant_data->metatraffic_locators.multicast.clear();
if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
if (announce_locators)
{
for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList)
if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
participant_data->metatraffic_locators.add_multicast_locator(loc);
for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(loc);
}
}
}

fastdds::rtps::ExternalLocatorsProcessor::add_external_locators(*participant_data,
attributes.builtin.metatraffic_external_unicast_locators,
attributes.default_external_unicast_locators);
fastdds::rtps::ExternalLocatorsProcessor::add_external_locators(*participant_data,
attributes.builtin.metatraffic_external_unicast_locators,
attributes.default_external_unicast_locators);
}

participant_data->m_participantName = std::string(attributes.getName());

Expand Down
21 changes: 14 additions & 7 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,22 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData(

// decide if we dismiss the participant using the ParticipantFilteringFlags
const ParticipantFilteringFlags_t& flags = m_discovery.discovery_config.ignoreParticipantFlags;
const GUID_t& remote = participant_data.m_guid;
const GUID_t& local = getLocalParticipantProxyData()->m_guid;
bool is_same_host = local.is_on_same_host_as(remote);
bool is_same_process = local.is_on_same_process_as(remote);

// Discard participants on different process when they don't have metatraffic locators
if (participant_data.metatraffic_locators.multicast.empty() &&
participant_data.metatraffic_locators.unicast.empty() &&
!is_same_process)
{
return nullptr;
}

if (flags != ParticipantFilteringFlags_t::NO_FILTER)
{
const GUID_t& remote = participant_data.m_guid;
const GUID_t& local = getLocalParticipantProxyData()->m_guid;

if (!local.is_on_same_host_as(remote))
if (!is_same_host)
{
if (flags & ParticipantFilteringFlags::FILTER_DIFFERENT_HOST)
{
Expand All @@ -184,9 +193,7 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData(
return nullptr;
}

bool is_same = local.is_on_same_process_as(remote);

if ((filter_same && is_same) || (filter_different && !is_same))
if ((filter_same && is_same_process) || (filter_different && !is_same_process))
{
return nullptr;
}
Expand Down
26 changes: 17 additions & 9 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,26 @@ RTPSParticipantImpl::RTPSParticipantImpl(
UDPv4TransportDescriptor descriptor;
descriptor.sendBufferSize = m_att.sendSocketBufferSize;
descriptor.receiveBufferSize = m_att.listenSocketBufferSize;
if (is_intraprocess_only())
{
// Avoid multicast leaving the host for intraprocess-only participants
descriptor.TTL = 0;
}
m_network_Factory.RegisterTransport(&descriptor, &m_att.properties);

#ifdef SHM_TRANSPORT_BUILTIN
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
if (!is_intraprocess_only())
{
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
}
#endif // ifdef SHM_TRANSPORT_BUILTIN
}

Expand Down
68 changes: 68 additions & 0 deletions test/blackbox/common/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,74 @@ TEST_P(Discovery, EndpointCreationMultithreaded)
endpoint_thr.join();
}

// Regression test for redmine issue 16253
TEST_P(Discovery, AsymmeticIgnoreParticipantFlags)
{
if (INTRAPROCESS != GetParam())
{
GTEST_SKIP() << "Only makes sense on INTRAPROCESS";
return;
}

// This participant is created with flags to ignore participants which are not on the same process.
// When the announcements of this participant arrive to p2, a single DATA(p) should be sent back.
// No other traffic is expected, since it will take place through intraprocess.
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
p1.ignore_participant_flags(static_cast<eprosima::fastrtps::rtps::ParticipantFilteringFlags_t>(
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST |
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS));
p1.init();
EXPECT_TRUE(p1.isInitialized());

// This participant is created with the test transport to check that nothing unexpected is sent to the
// multicast metatraffic locators.
// Setting localhost in the interface whitelist ensures that the traffic will not leave the host, and also
// that multicast datagrams are sent only once.
// A very long period for the participant announcement is set, along with 0 initial announcements, so we can
// have a exact expectation on the number of datagrams sent to multicast.
PubSubWriter<HelloWorldPubSubType> p2(TEST_TOPIC_NAME);

// This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
// its value when the first multicast datagram is sent.
std::atomic<uint32_t> multicast_port{ 0 };
// Only two multicast datagrams are allowed: the initial DATA(p) and the DATA(p) sent in response of the discovery
// of p1.
constexpr uint32_t allowed_messages_on_port = 2;

auto test_transport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();

std::atomic<uint32_t> messages_on_port{ 0 };
test_transport->interfaceWhiteList.push_back("127.0.0.1");
test_transport->locator_filter_ = [&multicast_port, &messages_on_port](
const eprosima::fastdds::rtps::Locator& destination)
{
if (IPLocator::isMulticast(destination))
{
uint32_t port = 0;
multicast_port.compare_exchange_strong(port, destination.port);
if (destination.port == multicast_port)
{
++messages_on_port;
}
}
return false;
};

p2.disable_builtin_transport().
add_user_transport_to_pparams(test_transport).
lease_duration({ 60 * 60, 0 }, { 50 * 60, 0 }).
initial_announcements(0, {});
p2.init();
EXPECT_TRUE(p2.isInitialized());

// Wait for participants and endpoints to discover each other
p1.wait_discovery();
p2.wait_discovery();

// Check expectation on the number of multicast datagrams sent by p2
EXPECT_EQ(messages_on_port, allowed_messages_on_port);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down