Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync master with 1.10.x #1129

Merged
merged 5 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ if(COMPILE_EXAMPLES)
add_subdirectory(examples)
endif()

###############################################################################
# SHM as Default transport
###############################################################################
option(SHM_TRANSPORT_DEFAULT "Adds SHM transport to the default transports" OFF)

###############################################################################
# Documentation
###############################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ bool HelloWorldPublisher::init()
PParam.rtps.useBuiltinTransports = false;

auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
const uint32_t segment_size = 2 * 1024 * 1024;
shm_transport->segment_size(segment_size, segment_size);
shm_transport->segment_size(2*1024*1024);
PParam.rtps.userTransports.push_back(shm_transport);

// UDP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ bool HelloWorldSubscriber::init()
PParam.rtps.useBuiltinTransports = false;

auto sm_transport = std::make_shared<SharedMemTransportDescriptor>();
sm_transport->segment_size(2 * 1024 * 1024, 2 * 1024 * 1024);
sm_transport->segment_size(2*1024*1024);
PParam.rtps.userTransports.push_back(sm_transport);

// UDP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(
char** argv)
{
Log::SetVerbosity(Log::Warning);
//Log::SetCategoryFilter(std::regex("RTPS_EDP_MATCH|RTPS_PDP_DISCOVERY|RTPS_PARTICIPANT_LISTEN|SHM"));
//Log::SetCategoryFilter(std::regex("RTPS_TRANSPORT_SHM"));


std::cout << "Starting "<< std::endl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac
{
return 0;
}

enum class OverflowPolicy
{
DISCARD,
FAIL
};


RTPS_DllAPI SharedMemTransportDescriptor();

RTPS_DllAPI SharedMemTransportDescriptor(
Expand All @@ -56,30 +50,22 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac
{
return segment_size_;
}

/**
* Sets the segment_size and the max_message_size.
* max_message must be <= segment_size
* @param [in] segment_size in bytes.
* @param [in] max_message_size in bytes.
*/

RTPS_DllAPI void segment_size(
uint32_t segment_size,
uint32_t max_message_size)
uint32_t segment_size)
{
segment_size_ = segment_size;
maxMessageSize = max_message_size;
}

RTPS_DllAPI OverflowPolicy port_overflow_policy() const
{
return port_overflow_policy_;
virtual uint32_t max_message_size() const override
{
return maxMessageSize;
}

RTPS_DllAPI void port_overflow_policy(
OverflowPolicy port_overflow_policy)
RTPS_DllAPI void max_message_size(
uint32_t max_message_size)
{
port_overflow_policy_ = port_overflow_policy;
maxMessageSize = max_message_size;
}

RTPS_DllAPI uint32_t port_queue_capacity() const
Expand All @@ -93,17 +79,6 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac
port_queue_capacity_ = port_queue_capacity;
}

RTPS_DllAPI OverflowPolicy segment_overflow_policy() const
{
return segment_overflow_policy_;
}

RTPS_DllAPI void segment_overflow_policy(
OverflowPolicy segment_overflow_policy)
{
segment_overflow_policy_ = segment_overflow_policy;
}

RTPS_DllAPI uint32_t healthy_check_timeout_ms() const
{
return healthy_check_timeout_ms_;
Expand All @@ -130,8 +105,6 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac

uint32_t segment_size_;
uint32_t port_queue_capacity_;
OverflowPolicy port_overflow_policy_;
OverflowPolicy segment_overflow_policy_;
uint32_t healthy_check_timeout_ms_;
std::string rtps_dump_file_;

Expand Down
9 changes: 0 additions & 9 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,6 @@
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="OverflowPolicy">
<xs:restriction base="xs:string">
<xs:enumeration value="DISCARD"/>
<xs:enumeration value="FAIL"/>
</xs:restriction>
</xs:simpleType>

<xs:complexType name="topicAttributesType">
<xs:all minOccurs="0">
<xs:element name="kind" type="topicKindType" minOccurs="0"/>
Expand Down Expand Up @@ -828,8 +821,6 @@
<xs:element name="tls" type="tlsConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="segment_size" type="uint32Type" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32Type" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_overflow_policy" type="OverflowPolicy" minOccurs="0" maxOccurs="1"/>
<xs:element name="segment_overflow_policy" type="OverflowPolicy" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32Type" minOccurs="0" maxOccurs="1"/>
<xs:element name="rtps_dump_file" type="stringType" minOccurs="0" maxOccurs="1"/>
</xs:all>
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ elseif(NOT EPROSIMA_INSTALLER)
$<$<AND:$<BOOL:${ANDROID}>,$<NOT:$<BOOL:${HAVE_CXX14}>>,$<NOT:$<BOOL:${HAVE_CXX1Y}>>>:ASIO_DISABLE_STD_STRING_VIEW>
$<$<BOOL:${WIN32}>:_ENABLE_ATOMIC_ALIGNMENT_FIX>
$<$<NOT:$<BOOL:${IS_THIRDPARTY_BOOST_SUPPORTED}>>:FASTDDS_SHM_TRANSPORT_DISABLED> # Do not compile SHM Transport
$<$<BOOL:${SHM_TRANSPORT_DEFAULT}>:SHM_TRANSPORT_BUILTIN> # Enable SHM as built-in transport
)

# Define public headers
Expand Down
19 changes: 13 additions & 6 deletions src/cpp/fastdds/core/policy/ParameterList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ bool ParameterList::readParameterListfromCDRMsg(
bool valid = true;
valid &= fastrtps::rtps::CDRMessage::readUInt16(&msg, (uint16_t*)&pid);
valid &= fastrtps::rtps::CDRMessage::readUInt16(&msg, &plength);

if (pid == PID_SENTINEL)
{
// PID_SENTINEL is always considered of length 0
plength = 0;
is_sentinel = true;
}

qos_size += (4 + plength);

// Align to 4 byte boundary and prepare for next iteration
Expand All @@ -169,13 +177,12 @@ bool ParameterList::readParameterListfromCDRMsg(
{
return false;
}
if (pid == PID_SENTINEL)
{
is_sentinel = true;
}
else if (!processor(&msg, pid, plength))
else if(!is_sentinel)
{
return false;
if (!processor(&msg, pid, plength))
{
return false;
}
}
}
return true;
Expand Down
15 changes: 14 additions & 1 deletion src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace rtps {

using UDPv4TransportDescriptor = fastdds::rtps::UDPv4TransportDescriptor;
using TCPTransportDescriptor = fastdds::rtps::TCPTransportDescriptor;
using SharedMemTransportDescriptor = fastdds::rtps::SharedMemTransportDescriptor;

static EntityId_t TrustedWriter(
const EntityId_t& reader)
Expand Down Expand Up @@ -117,13 +118,25 @@ RTPSParticipantImpl::RTPSParticipantImpl(
, is_intraprocess_only_(should_be_intraprocess_only(PParam))
, has_shm_transport_(false)
{
// Builtin transport by default
// Builtin transports by default
if (PParam.useBuiltinTransports)
{
UDPv4TransportDescriptor descriptor;
descriptor.sendBufferSize = m_att.sendSocketBufferSize;
descriptor.receiveBufferSize = m_att.listenSocketBufferSize;
m_network_Factory.RegisterTransport(&descriptor);

#ifdef SHM_TRANSPORT_BUILTIN
SharedMemTransportDescriptor shm_transport;
// We assume (Linux) UDP doubles the user socket buffer size in kernel, so
// the equivalent segment size in SHM would be socket buffer size x 2
auto segment_size_udp_equivalent =
std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
shm_transport.segment_size(segment_size_udp_equivalent);
// Use same default max_message_size on both UDP and SHM
shm_transport.max_message_size(descriptor.max_message_size());
has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
#endif
}

// BACKUP servers guid is its persistence one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class MultiProducerConsumerRingBuffer
* if the counter reaches 0 the cell becomes dirty
* and free_cells are incremented
* @return true if the cell ref_counter is 0 after pop
* @throw int if buffer is empty
* @throw std::exception if buffer is empty
*/
bool pop()
{
Expand Down Expand Up @@ -292,6 +292,36 @@ class MultiProducerConsumerRingBuffer
node->pointer_.store({0,total_cells}, std::memory_order_relaxed);
}

/**
* Copies the currenty enqueued cells to a vector
* @param [out] enqueued_cells pointers vector to where cells will be copied.
* @remark This is an unsafe operation, that means the caller must assure
* that no write operations are performed on the buffer while executing the copy.
*/
void copy(
std::vector<const T*>* enqueued_cells)
{
if (node_->registered_listeners_ > 0)
{
auto pointer = node_->pointer_.load(std::memory_order_relaxed);

uint32_t p = pointer_to_head(pointer);

while (p != pointer.write_p)
{
auto cell = &cells_[get_pointer_value(p)];

// If the cell has not been read by any listener
if (cell->ref_counter() > 0)
{
enqueued_cells->push_back(&cell->data());
}

p = inc_pointer(p);
}
}
}

private:

Node* node_;
Expand All @@ -306,7 +336,7 @@ class MultiProducerConsumerRingBuffer
}

uint32_t inc_pointer(
const uint32_t pointer)
const uint32_t pointer) const
{
uint32_t value = pointer & 0x7FFFFFFF;
uint32_t loop_flag = pointer >> 31;
Expand All @@ -322,6 +352,27 @@ class MultiProducerConsumerRingBuffer
return (loop_flag << 31) | value;
}

uint32_t pointer_to_head(
const Pointer& pointer) const
{
// Init the head as write pointer in previous loop
uint32_t head = pointer.write_p ^ 0x80000000;

uint32_t value = head & 0x7FFFFFFF;
uint32_t loop_flag = head >> 31;

if (value + pointer.free_cells >= node_->total_cells_)
{
loop_flag ^= 1;
}

// Skip the free cells
value = (value + pointer.free_cells) % node_->total_cells_;

// Bit 31 is loop_flag, 0-30 are value
return (loop_flag << 31) | value;
}

/**
* Called by the writters to lock listener's registering while writer pushes
*/
Expand Down
Loading