diff --git a/CMakeLists.txt b/CMakeLists.txt index 65d22f93131..b3d99162ed3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -281,6 +281,11 @@ if(COMPILE_EXAMPLES) add_subdirectory(examples) endif() +############################################################################### +# SHM as Default transport +############################################################################### +option(SHM_TRANSPORT_DEFAULT "Adds SHM transport to the default transports" OFF) + ############################################################################### # Documentation ############################################################################### diff --git a/examples/C++/HelloWorldExampleSharedMem/HelloWorldPublisher.cpp b/examples/C++/HelloWorldExampleSharedMem/HelloWorldPublisher.cpp index a4e7a564805..d97d4344677 100644 --- a/examples/C++/HelloWorldExampleSharedMem/HelloWorldPublisher.cpp +++ b/examples/C++/HelloWorldExampleSharedMem/HelloWorldPublisher.cpp @@ -56,8 +56,7 @@ bool HelloWorldPublisher::init() PParam.rtps.useBuiltinTransports = false; auto shm_transport = std::make_shared(); - const uint32_t segment_size = 2 * 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(2*1024*1024); PParam.rtps.userTransports.push_back(shm_transport); // UDP diff --git a/examples/C++/HelloWorldExampleSharedMem/HelloWorldSubscriber.cpp b/examples/C++/HelloWorldExampleSharedMem/HelloWorldSubscriber.cpp index 359ca0c8a97..5b54b38e3f4 100644 --- a/examples/C++/HelloWorldExampleSharedMem/HelloWorldSubscriber.cpp +++ b/examples/C++/HelloWorldExampleSharedMem/HelloWorldSubscriber.cpp @@ -50,7 +50,7 @@ bool HelloWorldSubscriber::init() PParam.rtps.useBuiltinTransports = false; auto sm_transport = std::make_shared(); - sm_transport->segment_size(2 * 1024 * 1024, 2 * 1024 * 1024); + sm_transport->segment_size(2*1024*1024); PParam.rtps.userTransports.push_back(sm_transport); // UDP diff --git a/examples/C++/HelloWorldExampleSharedMem/HelloWorld_main.cpp b/examples/C++/HelloWorldExampleSharedMem/HelloWorld_main.cpp index e5522447b58..9c45c7d700c 100644 --- a/examples/C++/HelloWorldExampleSharedMem/HelloWorld_main.cpp +++ b/examples/C++/HelloWorldExampleSharedMem/HelloWorld_main.cpp @@ -31,7 +31,7 @@ int main( char** argv) { Log::SetVerbosity(Log::Warning); - //Log::SetCategoryFilter(std::regex("RTPS_EDP_MATCH|RTPS_PDP_DISCOVERY|RTPS_PARTICIPANT_LISTEN|SHM")); + //Log::SetCategoryFilter(std::regex("RTPS_TRANSPORT_SHM")); std::cout << "Starting "<< std::endl; diff --git a/include/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h b/include/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h index b0b9df81eb1..a6e939443ba 100644 --- a/include/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h +++ b/include/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h @@ -40,13 +40,7 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac { return 0; } - - enum class OverflowPolicy - { - DISCARD, - FAIL - }; - + RTPS_DllAPI SharedMemTransportDescriptor(); RTPS_DllAPI SharedMemTransportDescriptor( @@ -56,30 +50,22 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac { return segment_size_; } - - /** - * Sets the segment_size and the max_message_size. - * max_message must be <= segment_size - * @param [in] segment_size in bytes. - * @param [in] max_message_size in bytes. - */ + RTPS_DllAPI void segment_size( - uint32_t segment_size, - uint32_t max_message_size) + uint32_t segment_size) { segment_size_ = segment_size; - maxMessageSize = max_message_size; } - RTPS_DllAPI OverflowPolicy port_overflow_policy() const - { - return port_overflow_policy_; + virtual uint32_t max_message_size() const override + { + return maxMessageSize; } - RTPS_DllAPI void port_overflow_policy( - OverflowPolicy port_overflow_policy) + RTPS_DllAPI void max_message_size( + uint32_t max_message_size) { - port_overflow_policy_ = port_overflow_policy; + maxMessageSize = max_message_size; } RTPS_DllAPI uint32_t port_queue_capacity() const @@ -93,17 +79,6 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac port_queue_capacity_ = port_queue_capacity; } - RTPS_DllAPI OverflowPolicy segment_overflow_policy() const - { - return segment_overflow_policy_; - } - - RTPS_DllAPI void segment_overflow_policy( - OverflowPolicy segment_overflow_policy) - { - segment_overflow_policy_ = segment_overflow_policy; - } - RTPS_DllAPI uint32_t healthy_check_timeout_ms() const { return healthy_check_timeout_ms_; @@ -130,8 +105,6 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac uint32_t segment_size_; uint32_t port_queue_capacity_; - OverflowPolicy port_overflow_policy_; - OverflowPolicy segment_overflow_policy_; uint32_t healthy_check_timeout_ms_; std::string rtps_dump_file_; diff --git a/resources/xsd/fastRTPS_profiles.xsd b/resources/xsd/fastRTPS_profiles.xsd index 9810cfcaef3..f9c07dff53b 100644 --- a/resources/xsd/fastRTPS_profiles.xsd +++ b/resources/xsd/fastRTPS_profiles.xsd @@ -452,13 +452,6 @@ - - - - - - - @@ -828,8 +821,6 @@ - - diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 810a0d345b3..6b688aaf4a5 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -355,6 +355,7 @@ elseif(NOT EPROSIMA_INSTALLER) $<$,$>,$>>:ASIO_DISABLE_STD_STRING_VIEW> $<$:_ENABLE_ATOMIC_ALIGNMENT_FIX> $<$>:FASTDDS_SHM_TRANSPORT_DISABLED> # Do not compile SHM Transport + $<$:SHM_TRANSPORT_BUILTIN> # Enable SHM as built-in transport ) # Define public headers diff --git a/src/cpp/fastdds/core/policy/ParameterList.cpp b/src/cpp/fastdds/core/policy/ParameterList.cpp index 7cd8f19822a..a65a39a40a6 100644 --- a/src/cpp/fastdds/core/policy/ParameterList.cpp +++ b/src/cpp/fastdds/core/policy/ParameterList.cpp @@ -160,6 +160,14 @@ bool ParameterList::readParameterListfromCDRMsg( bool valid = true; valid &= fastrtps::rtps::CDRMessage::readUInt16(&msg, (uint16_t*)&pid); valid &= fastrtps::rtps::CDRMessage::readUInt16(&msg, &plength); + + if (pid == PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + is_sentinel = true; + } + qos_size += (4 + plength); // Align to 4 byte boundary and prepare for next iteration @@ -169,13 +177,12 @@ bool ParameterList::readParameterListfromCDRMsg( { return false; } - if (pid == PID_SENTINEL) - { - is_sentinel = true; - } - else if (!processor(&msg, pid, plength)) + else if(!is_sentinel) { - return false; + if (!processor(&msg, pid, plength)) + { + return false; + } } } return true; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index d728cd7a77b..f6e7ea4fc48 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -63,6 +63,7 @@ namespace rtps { using UDPv4TransportDescriptor = fastdds::rtps::UDPv4TransportDescriptor; using TCPTransportDescriptor = fastdds::rtps::TCPTransportDescriptor; +using SharedMemTransportDescriptor = fastdds::rtps::SharedMemTransportDescriptor; static EntityId_t TrustedWriter( const EntityId_t& reader) @@ -117,13 +118,25 @@ RTPSParticipantImpl::RTPSParticipantImpl( , is_intraprocess_only_(should_be_intraprocess_only(PParam)) , has_shm_transport_(false) { - // Builtin transport by default + // Builtin transports by default if (PParam.useBuiltinTransports) { UDPv4TransportDescriptor descriptor; descriptor.sendBufferSize = m_att.sendSocketBufferSize; descriptor.receiveBufferSize = m_att.listenSocketBufferSize; m_network_Factory.RegisterTransport(&descriptor); + +#ifdef SHM_TRANSPORT_BUILTIN + SharedMemTransportDescriptor shm_transport; + // We assume (Linux) UDP doubles the user socket buffer size in kernel, so + // the equivalent segment size in SHM would be socket buffer size x 2 + auto segment_size_udp_equivalent = + std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2; + shm_transport.segment_size(segment_size_udp_equivalent); + // Use same default max_message_size on both UDP and SHM + shm_transport.max_message_size(descriptor.max_message_size()); + has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport); +#endif } // BACKUP servers guid is its persistence one diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp index c812b85a942..2f5b7158380 100644 --- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp +++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp @@ -103,7 +103,7 @@ class MultiProducerConsumerRingBuffer * if the counter reaches 0 the cell becomes dirty * and free_cells are incremented * @return true if the cell ref_counter is 0 after pop - * @throw int if buffer is empty + * @throw std::exception if buffer is empty */ bool pop() { @@ -292,6 +292,36 @@ class MultiProducerConsumerRingBuffer node->pointer_.store({0,total_cells}, std::memory_order_relaxed); } + /** + * Copies the currenty enqueued cells to a vector + * @param [out] enqueued_cells pointers vector to where cells will be copied. + * @remark This is an unsafe operation, that means the caller must assure + * that no write operations are performed on the buffer while executing the copy. + */ + void copy( + std::vector* enqueued_cells) + { + if (node_->registered_listeners_ > 0) + { + auto pointer = node_->pointer_.load(std::memory_order_relaxed); + + uint32_t p = pointer_to_head(pointer); + + while (p != pointer.write_p) + { + auto cell = &cells_[get_pointer_value(p)]; + + // If the cell has not been read by any listener + if (cell->ref_counter() > 0) + { + enqueued_cells->push_back(&cell->data()); + } + + p = inc_pointer(p); + } + } + } + private: Node* node_; @@ -306,7 +336,7 @@ class MultiProducerConsumerRingBuffer } uint32_t inc_pointer( - const uint32_t pointer) + const uint32_t pointer) const { uint32_t value = pointer & 0x7FFFFFFF; uint32_t loop_flag = pointer >> 31; @@ -322,6 +352,27 @@ class MultiProducerConsumerRingBuffer return (loop_flag << 31) | value; } + uint32_t pointer_to_head( + const Pointer& pointer) const + { + // Init the head as write pointer in previous loop + uint32_t head = pointer.write_p ^ 0x80000000; + + uint32_t value = head & 0x7FFFFFFF; + uint32_t loop_flag = head >> 31; + + if (value + pointer.free_cells >= node_->total_cells_) + { + loop_flag ^= 1; + } + + // Skip the free cells + value = (value + pointer.free_cells) % node_->total_cells_; + + // Bit 31 is loop_flag, 0-30 are value + return (loop_flag << 31) | value; + } + /** * Called by the writters to lock listener's registering while writer pushes */ diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index a52c02a19f2..60d0ecb85f0 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -36,11 +36,29 @@ class SharedMemGlobal { public: + struct BufferDescriptor; + typedef std::function&, + const std::string& domain_name)> PortFailureHandler; + + // Long names for SHM files could cause problems on some platforms + static constexpr uint32_t MAX_DOMAIN_NAME_LENGTH = 16; + SharedMemGlobal( - const std::string& domain_name) + const std::string& domain_name, + PortFailureHandler failure_handler) : domain_name_(domain_name) { + if (domain_name.length() > MAX_DOMAIN_NAME_LENGTH) + { + throw std::runtime_error( + domain_name + + " too long for domain name (max " + + std::to_string(MAX_DOMAIN_NAME_LENGTH) + + " characters"); + } + Port::on_failure_buffer_descriptors_handler(failure_handler); } ~SharedMemGlobal() @@ -65,17 +83,36 @@ class SharedMemGlobal { UUID<8> uuid; uint32_t port_id; + SharedMemSegment::condition_variable empty_cv; SharedMemSegment::mutex empty_cv_mutex; + SharedMemSegment::offset buffer; SharedMemSegment::offset buffer_node; std::atomic ref_counter; + uint32_t waiting_count; - uint32_t check_awaken_count; - uint32_t check_id; + + static constexpr size_t LISTENERS_STATUS_SIZE = 1024; + struct ListenerStatus + { + uint8_t is_waiting : 1; + uint8_t counter : 3; + uint8_t last_verified_counter : 3; + uint8_t pad : 1; + }; + ListenerStatus listeners_status[LISTENERS_STATUS_SIZE]; + uint32_t num_listeners; + std::atomic last_listeners_status_check_time_ms; + uint32_t healthy_check_timeout_ms; + uint32_t port_wait_timeout_ms; + uint32_t max_buffer_descriptors; + bool is_port_ok; bool is_opened_read_exclusive; bool is_opened_for_reading; + + char domain_name[MAX_DOMAIN_NAME_LENGTH+1]; }; /** @@ -89,62 +126,265 @@ class SharedMemGlobal private: - std::unique_ptr port_segment_; + std::shared_ptr port_segment_; PortNode* node_; - std::unique_ptr > buffer_; + std::unique_ptr> buffer_; uint64_t overflows_count_; - bool was_check_thread_detached_; + inline void notify_unicast( + bool was_buffer_empty_before_push) + { + if (was_buffer_empty_before_push) + { + node_->empty_cv.notify_one(); + } + } - bool check_all_waiting_threads_alive( - uint32_t time_out_ms) + inline void notify_multicast() { - bool is_check_ok = false; + node_->empty_cv.notify_all(); + } + /** + * Singleton with a thread that periodically checks all opened ports + * to verify if some listener is dead. + */ + class Watchdog + { + public: + + struct PortContext { - std::lock_guard lock_empty(node_->empty_cv_mutex); + std::shared_ptr port_segment; + PortNode* node; + MultiProducerConsumerRingBuffer* buffer; + }; - if (!node_->is_port_ok) + static Watchdog& get() + { + static Watchdog watch_dog; + return watch_dog; + } + + /** + * Sets the on_failure_buffer_descriptors_handler_. + * This is done only the first time the function is called, + * as the handler must be a static inmutable function. + */ + void on_failure_buffer_descriptors_handler( + std::function&, + const std::string& domain_name)> handler) + { + if (!is_on_failure_buffer_descriptors_handler_set_) { - throw std::runtime_error("Previous check failed"); + std::lock_guard lock(watched_ports_mutex_); + + // Checking is_on_failure_buffer_descriptors_handler_set_ twice can be weird but avoid + // the use of a recursive_mutex here. + if (!is_on_failure_buffer_descriptors_handler_set_) + { + on_failure_buffer_descriptors_handler_ = handler; + is_on_failure_buffer_descriptors_handler_set_ = true; + } } + } - node_->check_id++; - node_->check_awaken_count = node_->waiting_count; + /** + * Called by the Port constructor, adds a port to the watching list. + */ + void add_port_to_watch( + std::shared_ptr&& port) + { + std::lock_guard lock(watched_ports_mutex_); + watched_ports_.push_back(port); + } + + /** + * Called by the Port destructor, removes a port from the watching list. + */ + void remove_port_from_watch( + PortNode* port_node) + { + std::lock_guard lock(watched_ports_mutex_); + + auto it = watched_ports_.begin(); + + while (it != watched_ports_.end()) + { + if ((*it)->node == port_node) + { + watched_ports_.erase(it); + break; + } + + ++it; + } - node_->empty_cv.notify_all(); } - auto start = std::chrono::high_resolution_clock::now(); + /** + * Forces Wake-up of the checking thread + */ + void wake_up() + { + { + std::lock_guard lock(wake_run_mutex_); + wake_run_ = true; + } + + wake_run_cv_.notify_one(); + } + + private: + + std::vector > watched_ports_; + std::thread thread_run_; + std::mutex watched_ports_mutex_; + + std::condition_variable wake_run_cv_; + std::mutex wake_run_mutex_; + bool wake_run_; + + bool exit_thread_; - do + std::function&, + const std::string& domain_name)> on_failure_buffer_descriptors_handler_; + + bool is_on_failure_buffer_descriptors_handler_set_; + + Watchdog() + : wake_run_(false) + , exit_thread_(false) + , is_on_failure_buffer_descriptors_handler_set_(false) { - std::this_thread::yield(); - is_check_ok = node_->check_awaken_count == 0; + thread_run_ = std::thread(&Watchdog::run, this); } - while (!is_check_ok && - std::chrono::high_resolution_clock::now() < start + std::chrono::milliseconds(time_out_ms)); - return is_check_ok; - } + ~Watchdog() + { + exit_thread_ = true; + wake_up(); + thread_run_.join(); + } - inline void notify_unicast( - bool was_buffer_empty_before_push) - { - if (was_buffer_empty_before_push) + bool update_status_all_listeners( + PortNode* port_node) { - node_->empty_cv.notify_one(); + for (uint32_t i = 0; i < port_node->num_listeners; i++) + { + auto& status = port_node->listeners_status[i]; + // Check only currently waiting listeners + if (status.is_waiting) + { + if (status.counter != status.last_verified_counter) + { + status.last_verified_counter = status.counter; + } + else // Counter is freeze => this listener is blocked!!! + { + return false; + } + } + } + + port_node->last_listeners_status_check_time_ms.exchange( + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()).count()); + + return true; } - } - inline void notify_multicast() + void run() + { + while (!exit_thread_) + { + { + std::unique_lock lock(wake_run_mutex_); + + wake_run_cv_.wait_for( + lock, + std::chrono::seconds(1), + [&] { + return wake_run_; + }); + + wake_run_ = false; + } + + auto now = std::chrono::high_resolution_clock::now(); + + std::lock_guard lock(watched_ports_mutex_); + + auto port_it = watched_ports_.begin(); + while (port_it != watched_ports_.end()) + { + // If more than 'healthy_check_timeout_ms' milliseconds elapsed since last check + if (std::chrono::duration_cast(now.time_since_epoch()).count() + - (*port_it)->node->last_listeners_status_check_time_ms.load() + > (*port_it)->node->healthy_check_timeout_ms) + { + std::vector descriptors_enqueued; + + try + { + std::unique_lock lock_port((*port_it)->node->empty_cv_mutex); + if (!update_status_all_listeners((*port_it)->node)) + { + if ((*port_it)->node->is_port_ok) + { + (*port_it)->node->is_port_ok = false; + (*port_it)->buffer->copy(&descriptors_enqueued); + assert(on_failure_buffer_descriptors_handler_); + on_failure_buffer_descriptors_handler_(descriptors_enqueued, + (*port_it)->node->domain_name); + } + } + + ++port_it; + } + catch (std::exception& e) + { + (*port_it)->node->is_port_ok = false; + + logWarning(RTPS_TRANSPORT_SHM, "Port " << (*port_it)->node->port_id + << " error: " << e.what()); + + // Remove the port from watch + port_it = watched_ports_.erase(port_it); + } + } + else + { + ++port_it; + } + } + } + } + }; + + bool check_status_all_listeners() const { - node_->empty_cv.notify_all(); - } + for (uint32_t i = 0; i < node_->num_listeners; i++) + { + auto& status = node_->listeners_status[i]; + // Check only currently waiting listeners + if (status.is_waiting) + { + if (status.counter == status.last_verified_counter) + { + return false; + } + } + } + return true; + } + public: /** @@ -163,8 +403,21 @@ class SharedMemGlobal Write }; + static std::string open_mode_to_string( + OpenMode open_mode) + { + switch (open_mode) + { + case OpenMode::ReadShared: return "ReadShared"; + case OpenMode::ReadExclusive: return "ReadExclusive"; + case OpenMode::Write: return "Write"; + } + + return ""; + } + Port( - std::unique_ptr&& port_segment, + std::shared_ptr&& port_segment, PortNode* node) : port_segment_(std::move(port_segment)) , node_(node) @@ -180,12 +433,18 @@ class SharedMemGlobal new MultiProducerConsumerRingBuffer(buffer_base, buffer_node)); node_->ref_counter.fetch_add(1); + + auto port_context = std::make_shared(); + *port_context = {port_segment_, node_, buffer_.get()}; + Watchdog::get().add_port_to_watch(std::move(port_context)); } ~Port() { - if (node_->ref_counter.fetch_sub(1) == 1) - { + Watchdog::get().remove_port_from_watch(node_); + + if (node_->ref_counter.fetch_sub(1) == 1 && node_->is_port_ok) + { auto segment_name = port_segment_->name(); logInfo(RTPS_TRANSPORT_SHM, THREADID << "Port " << node_->port_id @@ -206,6 +465,15 @@ class SharedMemGlobal } } + static void on_failure_buffer_descriptors_handler( + std::function&, + const std::string& domain_name)> handler) + + { + Watchdog::get().on_failure_buffer_descriptors_handler(handler); + } + /** * Try to enqueue a buffer descriptor in the port. * If the port queue is full returns inmediatelly with false value. @@ -219,6 +487,11 @@ class SharedMemGlobal { std::unique_lock lock_empty(node_->empty_cv_mutex); + if (!node_->is_port_ok) + { + throw std::runtime_error("the port is marked as not ok!"); + } + try { bool was_opened_as_unicast_port = node_->is_opened_read_exclusive; @@ -255,41 +528,91 @@ class SharedMemGlobal * Waits while the port is empty and listener is not closed * @param[in] listener reference to the listener that will wait for an incoming buffer descriptor. * @param[in] is_listener_closed this reference can become true in the middle of the waiting process, + * @param[in] listener_index to update the port's listener_status, * if that happens wait is aborted. */ void wait_pop( Listener& listener, - const std::atomic& is_listener_closed) + const std::atomic& is_listener_closed, + uint32_t listener_index) { - std::unique_lock lock(node_->empty_cv_mutex); - - uint32_t check_id = node_->check_id; + try + { + std::unique_lock lock(node_->empty_cv_mutex); - node_->waiting_count++; + if (!node_->is_port_ok) + { + throw std::runtime_error("port marked as not ok"); + } - do - { - node_->empty_cv.wait(lock, [&] { - return is_listener_closed.load() || listener.head() != nullptr || check_id != node_->check_id; - }); + auto& status = node_->listeners_status[listener_index]; + // Update this listener status + status.is_waiting = 1; + status.counter = status.last_verified_counter + 1; + node_->waiting_count++; - if (check_id != node_->check_id) + do { - node_->check_awaken_count--; - check_id = node_->check_id; + boost::system_time const timeout = + boost::get_system_time()+ boost::posix_time::milliseconds(node_->port_wait_timeout_ms); - if (listener.head()) + if (node_->empty_cv.timed_wait(lock, timeout, [&] + { + return is_listener_closed.load() || listener.head() != nullptr; + })) { - break; + break; // Codition met, Break the while } - } - else - { - break; - } - } while (1); + else // Timeout + { + if (!node_->is_port_ok) + { + throw std::runtime_error("port marked as not ok"); + } + + status.counter = status.last_verified_counter + 1; + } + } while (1); + + node_->waiting_count--; + status.is_waiting = 0; + + } + catch (const std::exception&) + { + node_->is_port_ok = false; + throw; + } + } + + inline bool is_port_ok() const + { + return node_->is_port_ok; + } + + inline uint32_t port_id() const + { + return node_->port_id; + } - node_->waiting_count--; + inline OpenMode open_mode() const + { + if(node_->is_opened_for_reading) + { + return node_->is_opened_read_exclusive ? OpenMode::ReadExclusive : OpenMode::ReadShared; + } + + return OpenMode::Write; + } + + inline uint32_t healthy_check_timeout_ms() const + { + return node_->healthy_check_timeout_ms; + } + + inline uint32_t max_buffer_descriptors() const + { + return node_->max_buffer_descriptors; } /** @@ -326,80 +649,72 @@ class SharedMemGlobal /** * Register a new listener * The new listener's read pointer is equal to the ring-buffer write pointer at the registering moment. + * @param [out] listener_index pointer to where the index of the listener is returned. This index is + * used to reference the elements from the listeners_status array. * @return A shared_ptr to the listener. * The listener will be unregistered when shared_ptr is destroyed. */ - std::shared_ptr create_listener() + std::shared_ptr create_listener(uint32_t* listener_index) { + std::lock_guard lock(node_->empty_cv_mutex); + + *listener_index = node_->num_listeners++; + return buffer_->register_listener(); } - bool was_check_thread_detached() + /** + * Decrement the number of listeners by one + */ + void unregister_listener() { - return was_check_thread_detached_; + std::lock_guard lock(node_->empty_cv_mutex); + + node_->num_listeners--; } /** - * Performs a check of the opened port. - * When a process crashes with a port opened the port can be leave inoperative. - * @param [in] healthy_check_timeout_ms max timeout (milliseconds) allowed for the whole - * healthy check operation. + * Performs a check on the opened port. + * When a process crashes with a port opened the port can be left inoperative. * @throw std::exception if the port is inoperative. */ - void healthy_check( - uint32_t healthy_check_timeout_ms) + void healthy_check() { - std::shared_ptr is_check_ok = std::make_shared(false); - - was_check_thread_detached_ = false; - - std::shared_ptr notify_check_done_mutex = std::make_shared(); - - std::shared_ptr notify_check_done_cv = - std::make_shared(); + if (!node_->is_port_ok) + { + throw std::runtime_error("port is marked as not ok"); + } - std::shared_ptr is_check_done_received = std::make_shared(false); + auto t0 = std::chrono::high_resolution_clock::now(); - std::thread check_thread([=] + // If in any moment during the timeout all waiting listeners are OK + // then the port is OK + bool is_check_ok = false; + while ( !is_check_ok && + std::chrono::duration_cast + (std::chrono::high_resolution_clock::now() - t0).count() < node_->healthy_check_timeout_ms) + { { - try - { - *is_check_ok = check_all_waiting_threads_alive(healthy_check_timeout_ms); - - { - std::lock_guard lock_received(*notify_check_done_mutex); - *is_check_done_received = true; - } + std::unique_lock lock(node_->empty_cv_mutex); + is_check_ok = check_status_all_listeners(); - notify_check_done_cv->notify_one(); - } - catch (std::exception&) + if (!node_->is_port_ok) { - *is_check_ok = false; + throw std::runtime_error("port marked as not ok"); } - }); - - std::unique_lock lock(*notify_check_done_mutex); + } - if (!notify_check_done_cv->wait_for(lock, - std::chrono::milliseconds(healthy_check_timeout_ms), - [&] { return *is_check_done_received; })) - { - node_->is_port_ok = false; - was_check_thread_detached_ = true; - check_thread.detach(); - throw std::runtime_error("healthy_check timeout"); + if (!is_check_ok) + { + std::this_thread::sleep_for(std::chrono::milliseconds(node_->port_wait_timeout_ms)); + } } - check_thread.join(); - - if (!(*is_check_ok)) + if (!is_check_ok || !node_->is_port_ok) { - node_->is_port_ok = false; throw std::runtime_error("healthy_check failed"); } } - }; // Port /** @@ -425,7 +740,8 @@ class SharedMemGlobal auto port_segment_name = domain_name_ + "_port" + std::to_string(port_id); - logInfo(RTPS_TRANSPORT_SHM, THREADID << "Opening " << port_segment_name); + logInfo(RTPS_TRANSPORT_SHM, THREADID << "Opening " + << port_segment_name); std::unique_ptr port_mutex = SharedMemSegment::open_or_create_and_lock_named_mutex(port_segment_name + "_mutex"); @@ -435,7 +751,7 @@ class SharedMemGlobal try { // Try to open - auto port_segment = std::unique_ptr( + auto port_segment = std::shared_ptr( new SharedMemSegment(boost::interprocess::open_only, port_segment_name.c_str())); SharedMemGlobal::PortNode* port_node; @@ -448,19 +764,19 @@ class SharedMemGlobal catch (std::exception&) { logWarning(RTPS_TRANSPORT_SHM, THREADID << "Port " - << port_id << " Couldn't find port_node "); + << port_id << " Couldn't find port_node "); SharedMemSegment::remove(port_segment_name.c_str()); logWarning(RTPS_TRANSPORT_SHM, THREADID << "Port " - << port_id << " Removed."); + << port_id << " Removed."); throw; } try - { - port->healthy_check(healthy_check_timeout_ms); + { + port->healthy_check(); if ( (port_node->is_opened_read_exclusive && open_mode != Port::OpenMode::Write) || (port_node->is_opened_for_reading && open_mode == Port::OpenMode::ReadExclusive)) @@ -475,34 +791,20 @@ class SharedMemGlobal } else { - port_node->is_opened_read_exclusive = (open_mode == Port::OpenMode::ReadExclusive); - port_node->is_opened_for_reading |= (open_mode == Port::OpenMode::ReadShared); + port_node->is_opened_read_exclusive |= (open_mode == Port::OpenMode::ReadExclusive); + port_node->is_opened_for_reading |= (open_mode != Port::OpenMode::Write); logInfo(RTPS_TRANSPORT_SHM, THREADID << "Port " << port_node->port_id << " (" << port_node->uuid.to_string() << - ") Opened"); + ") Opened" << Port::open_mode_to_string(open_mode)); } } catch (std::exception&) { auto port_uuid = port_node->uuid.to_string(); - - // Healthy check left a thread blocked at port resources - // So we leave port_segment unmanaged, better to leak memory than a crash - if (port->was_check_thread_detached()) - { - // Release owership - port_segment.release(); - - logWarning(RTPS_TRANSPORT_SHM, THREADID << "Existing Port " - << port_id << " (" << port_uuid << - ") NOT Healthy (check_thread detached)."); - } - else - { - logWarning(RTPS_TRANSPORT_SHM, THREADID << "Existing Port " - << port_id << " (" << port_uuid << ") NOT Healthy."); - } + + logWarning(RTPS_TRANSPORT_SHM, THREADID << "Existing Port " + << port_id << " (" << port_uuid << ") NOT Healthy."); SharedMemSegment::remove(port_segment_name.c_str()); @@ -530,7 +832,7 @@ class SharedMemGlobal memset(payload, 0, segment_size); port_segment->get().deallocate(payload); - port = init_port(port_id, port_segment, max_buffer_descriptors, open_mode); + port = init_port(port_id, port_segment, max_buffer_descriptors, open_mode, healthy_check_timeout_ms); } catch (std::exception& e) { @@ -557,7 +859,8 @@ class SharedMemGlobal uint32_t port_id, std::unique_ptr& segment, uint32_t max_buffer_descriptors, - Port::OpenMode open_mode) + Port::OpenMode open_mode, + uint32_t healthy_check_timeout_ms) { std::shared_ptr port; PortNode* port_node = nullptr; @@ -571,16 +874,28 @@ class SharedMemGlobal port_node->is_port_ok = true; UUID<8>::generate(port_node->uuid); port_node->waiting_count = 0; - port_node->check_awaken_count = 0; - port_node->check_id = 0; port_node->is_opened_read_exclusive = (open_mode == Port::OpenMode::ReadExclusive); port_node->is_opened_for_reading = (open_mode != Port::OpenMode::Write); + port_node->num_listeners = 0; + port_node->healthy_check_timeout_ms = healthy_check_timeout_ms; + port_node->last_listeners_status_check_time_ms = + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + port_node->port_wait_timeout_ms = healthy_check_timeout_ms / 3; + port_node->max_buffer_descriptors = max_buffer_descriptors; + memset(port_node->listeners_status, 0, sizeof(port_node->listeners_status)); +#ifdef _MSC_VER + strncpy_s(port_node->domain_name, sizeof(port_node->domain_name), + domain_name_.c_str(), sizeof(port_node->domain_name)-1); +#else + strncpy(port_node->domain_name, domain_name_.c_str(), sizeof(port_node->domain_name)-1); +#endif + port_node->domain_name[sizeof(port_node->domain_name)-1] = 0; // Buffer cells allocation - port_node->buffer = - segment->get_offset_from_address( - segment->get().construct::Cell>( - boost::interprocess::anonymous_instance)[max_buffer_descriptors]()); + auto buffer = segment->get().construct::Cell>( + boost::interprocess::anonymous_instance)[max_buffer_descriptors](); + port_node->buffer = segment->get_offset_from_address(buffer); // Buffer node allocation buffer_node = segment->get().construct::Node>( @@ -593,8 +908,8 @@ class SharedMemGlobal port = std::make_shared(std::move(segment), port_node); logInfo(RTPS_TRANSPORT_SHM, THREADID << "Port " - << port_node->port_id << " (" << port_node->uuid.to_string() << - ") Created."); + << port_node->port_id << " (" << port_node->uuid.to_string() + << Port::open_mode_to_string(open_mode) << ") Created."); } catch (const std::exception&) { diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp index 92529ab828e..107d4894854 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp @@ -41,7 +41,7 @@ class SharedMemManager { struct { - std::atomic ref_count; + std::atomic ref_count; uint32_t data_size; }header; uint8_t data[1]; @@ -57,8 +57,31 @@ class SharedMemManager SharedMemManager( const std::string& domain_name) - : global_segment_(domain_name) + : global_segment_( + domain_name, + []( const std::vector& buffer_descriptors, + const std::string& domain_name) + { + on_failure_buffer_descriptor_handler(buffer_descriptors, domain_name); + }) { + if (domain_name.length() > SharedMemGlobal::MAX_DOMAIN_NAME_LENGTH) + { + throw std::runtime_error( + domain_name + + " too long for domain name (max " + + std::to_string(SharedMemGlobal::MAX_DOMAIN_NAME_LENGTH) + + " characters"); + } + + SharedMemGlobal::Port::on_failure_buffer_descriptors_handler( + []( const std::vector& buffer_descriptors, + const std::string& domain_name) + { + on_failure_buffer_descriptor_handler(buffer_descriptors, domain_name); + } + ); + per_allocation_extra_size_ = SharedMemSegment::compute_per_allocation_extra_size(std::alignment_of::value); } @@ -155,7 +178,9 @@ class SharedMemManager uint32_t payload_size) : segment_id_() , overflows_count_(0) +#ifdef SHM_SEGMENT_OVERFLOW_TIMEOUT , max_payload_size_(payload_size) +#endif { segment_id_.generate(); @@ -262,7 +287,10 @@ class SharedMemManager std::shared_ptr segment_; SharedMemSegment::Id segment_id_; uint64_t overflows_count_; + +#ifdef SHM_SEGMENT_OVERFLOW_TIMEOUT uint32_t max_payload_size_; +#endif void release_buffer( BufferNode* buffer_node) @@ -275,7 +303,9 @@ class SharedMemManager auto node_it = allocated_nodes_.begin(); while (node_it != allocated_nodes_.end()) { - if ((*node_it)->header.ref_count.load() == 0) + // This shouldn't normally be negative, but when processes crashes + // due to fault-tolerance mecanishms it could happen. + if ((*node_it)->header.ref_count.load() <= 0) { release_buffer(*node_it); @@ -292,6 +322,7 @@ class SharedMemManager uint32_t size, const std::chrono::steady_clock::time_point& max_blocking_time_point) { +#ifdef SHM_SEGMENT_OVERFLOW_TIMEOUT SharedMemSegment::spin_wait spin_wait; // Not enough avaible space @@ -308,7 +339,9 @@ class SharedMemManager // vs interprocess_mutex + interprocess_cv. spin_wait.yield(); } - +#else + (void)max_blocking_time_point; +#endif if (segment_node_->free_bytes.load(std::memory_order_relaxed) < size) { throw std::runtime_error("allocation timeout"); @@ -329,61 +362,122 @@ class SharedMemManager public: Listener( - SharedMemManager& shared_mem_manager, + SharedMemManager* shared_mem_manager, std::shared_ptr port) : global_port_(port) , shared_mem_manager_(shared_mem_manager) , is_closed_(false) { - global_listener_ = global_port_->create_listener(); + global_listener_ = global_port_->create_listener(&listener_index_); + } + + ~Listener() + { + global_listener_.reset(); + if (global_port_) + { + global_port_->unregister_listener(); + } } + Listener& operator = ( + Listener&& other) + { + global_listener_ = other.global_listener_; + other.global_listener_.reset(); + global_port_ = other.global_port_; + other.global_port_.reset(); + shared_mem_manager_ = other.shared_mem_manager_; + is_closed_.exchange(other.is_closed_); + + return *this; + } + /** * Extract the first buffer enqued in the port. * If the queue is empty, blocks until a buffer is pushed * to the port. + * @return A shared_ptr to the buffer, this shared_ptr can be nullptr if the + * wait was interrupted because errors or close operations. * @remark Multithread not supported. */ std::shared_ptr pop() { - bool was_cell_freed; std::shared_ptr buffer_ref; - SharedMemGlobal::PortCell* head_cell = nullptr; - - while ( !is_closed_.load() && nullptr == (head_cell = global_listener_->head()) ) + try { - // Wait until threre's data to pop - global_port_->wait_pop(*global_listener_, is_closed_); - } + bool was_cell_freed; + + SharedMemGlobal::PortCell* head_cell = nullptr; - if (!head_cell) - { - return nullptr; - } + while ( !is_closed_.load() && nullptr == (head_cell = global_listener_->head()) ) + { + // Wait until there's data to pop + global_port_->wait_pop(*global_listener_, is_closed_, listener_index_); + } - SharedMemGlobal::BufferDescriptor buffer_descriptor = head_cell->data(); + if (!head_cell) + { + return nullptr; + } - SegmentNode* segment_node; - auto segment = shared_mem_manager_.find_segment(buffer_descriptor.source_segment_id, &segment_node); - auto buffer_node = - static_cast(segment->get_address_from_offset(buffer_descriptor.buffer_node_offset)); + if (!global_port_->is_port_ok()) + { + throw std::runtime_error(""); + } - // TODO(Adolfo) : Dynamic allocation. Use foonathan to convert it to static allocation - buffer_ref = std::make_shared(segment, buffer_descriptor.source_segment_id, buffer_node, - segment_node); + SharedMemGlobal::BufferDescriptor buffer_descriptor = head_cell->data(); - // If the cell has been read by all listeners - global_port_->pop(*global_listener_, was_cell_freed); + SegmentNode* segment_node; + auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id, &segment_node); + auto buffer_node = + static_cast(segment->get_address_from_offset(buffer_descriptor.buffer_node_offset)); - if (was_cell_freed) - { - buffer_node->header.ref_count.fetch_sub(1); + // TODO(Adolfo) : Dynamic allocation. Use foonathan to convert it to static allocation + buffer_ref = std::make_shared(segment, buffer_descriptor.source_segment_id, buffer_node, + segment_node); + + // If the cell has been read by all listeners + global_port_->pop(*global_listener_, was_cell_freed); + + if (was_cell_freed) + { + buffer_node->header.ref_count.fetch_sub(1); + } + } + catch(const std::exception& e) + { + if (global_port_->is_port_ok()) + { + throw; + } + else + { + logWarning(RTPS_TRANSPORT_SHM, "SHM Listener on port " << global_port_->port_id() << " failure: " + << e.what()); + + regenerate_port(); + } } return buffer_ref; } + void regenerate_port() + { + auto new_port = shared_mem_manager_->open_port( + global_port_->port_id(), + global_port_->max_buffer_descriptors(), + global_port_->healthy_check_timeout_ms(), + global_port_->open_mode() + ); + + auto new_listener = new_port->create_listener(); + + *this = std::move(*new_listener); + } + /** * Unblock a thread blocked in pop() call, not allowing pop() to block again, */ @@ -398,10 +492,12 @@ class SharedMemManager std::shared_ptr global_port_; std::shared_ptr global_listener_; + uint32_t listener_index_; - SharedMemManager& shared_mem_manager_; + SharedMemManager* shared_mem_manager_; std::atomic is_closed_; + }; // Listener /** @@ -412,13 +508,26 @@ class SharedMemManager public: Port( - SharedMemManager& shared_mem_manager, - std::shared_ptr port) + SharedMemManager* shared_mem_manager, + std::shared_ptr port, + SharedMemGlobal::Port::OpenMode open_mode) : shared_mem_manager_(shared_mem_manager) , global_port_(port) + , open_mode_(open_mode) { } + Port& operator = ( + Port&& other) + { + shared_mem_manager_ = other.shared_mem_manager_; + open_mode_ = other.open_mode_; + global_port_ = other.global_port_; + other.global_port_.reset(); + + return *this; + } + /** * Try to enqueue a buffer in the port. * @returns false If the port's queue is full so buffer couldn't be enqueued. @@ -444,10 +553,22 @@ class SharedMemManager shared_mem_buffer->decrease_ref(); } } - catch (std::exception&) + catch (std::exception& e) { shared_mem_buffer->decrease_ref(); - throw; + + if (!global_port_->is_port_ok()) + { + logWarning(RTPS_TRANSPORT_SHM, "SHM Port " << global_port_->port_id() << " failure: " + << e.what()); + + regenerate_port(); + ret = false; + } + else + { + throw; + } } return ret; @@ -460,10 +581,23 @@ class SharedMemManager private: - SharedMemManager& shared_mem_manager_; + void regenerate_port() + { + auto new_port = shared_mem_manager_->open_port( + global_port_->port_id(), + global_port_->max_buffer_descriptors(), + global_port_->healthy_check_timeout_ms(), + open_mode_); + + *this = std::move(*new_port); + } + + SharedMemManager* shared_mem_manager_; std::shared_ptr global_port_; + SharedMemGlobal::Port::OpenMode open_mode_; + }; // Port /** @@ -490,8 +624,18 @@ class SharedMemManager uint32_t healthy_check_timeout_ms, SharedMemGlobal::Port::OpenMode open_mode = SharedMemGlobal::Port::OpenMode::ReadShared) { - return std::make_shared(*this, - global_segment_.open_port(port_id, max_descriptors, healthy_check_timeout_ms, open_mode)); + return std::make_shared(this, + global_segment_.open_port(port_id, max_descriptors, healthy_check_timeout_ms, open_mode), + open_mode); + } + + /** + * @return Pointer to the underlying global segment. The pointer is only valid + * while this SharedMemManager is alive. + */ + SharedMemGlobal* global_segment() + { + return &global_segment_; } private: @@ -584,6 +728,43 @@ class SharedMemManager return segment; } + + /** + * Called by PortWatchdog when a dead listener has been detected. + * At this point the port is marked as not OK, and a vector of + * the recovered descriptors, from the port, are passed to + * this function that performs their release. + */ + static void on_failure_buffer_descriptor_handler( + const std::vector& buffer_descriptors, + const std::string& domain_name) + { + try + { + SharedMemManager shared_mem_manager(domain_name); + + for (auto buffer_descriptor : buffer_descriptors) + { + SegmentNode* segment_node; + auto segment = shared_mem_manager.find_segment(buffer_descriptor->source_segment_id, &segment_node); + auto buffer_node = + static_cast(segment->get_address_from_offset(buffer_descriptor->buffer_node_offset)); + + int32_t buffer_size = buffer_node->header.data_size; + + // Last reference to the buffer + if (buffer_node->header.ref_count.fetch_sub(1) == 1) + { + // Anotate the new free space + segment_node->free_bytes.fetch_add(buffer_size); + } + } + } + catch (const std::exception& e) + { + logError(RTPS_TRANSPORT_SHM, e.what()); + } + } }; } // namespace rtps diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index 5251639994b..605fea106ff 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -208,29 +208,22 @@ bool SharedMemTransport::DoInputLocatorsMatch( bool SharedMemTransport::init() { - try + // TODO(Adolfo): Calculate this value from UDP sockets buffers size. + static constexpr uint32_t shm_default_segment_size = 512 * 1024; + + if(configuration_.segment_size() == 0) { - switch (configuration_.port_overflow_policy()) - { - case SharedMemTransportDescriptor::OverflowPolicy::DISCARD: - push_lambda_ = &SharedMemTransport::push_discard; - break; - case SharedMemTransportDescriptor::OverflowPolicy::FAIL: - push_lambda_ = &SharedMemTransport::push_fail; - break; - default: - throw std::runtime_error("unknown port_overflow_policy"); - } + configuration_.segment_size(shm_default_segment_size); + } - switch (configuration_.segment_overflow_policy()) - { - case SharedMemTransportDescriptor::OverflowPolicy::DISCARD: - case SharedMemTransportDescriptor::OverflowPolicy::FAIL: - break; - default: - throw std::runtime_error("unknown port_overflow_policy"); - } + if(configuration_.segment_size() < configuration_.max_message_size()) + { + logError(RTPS_MSG_OUT, "max_message_size cannot be greater than segment_size"); + return false; + } + try + { shared_mem_manager_ = std::make_shared(SHM_MANAGER_DOMAIN); shared_mem_segment_ = shared_mem_manager_->create_segment(configuration_.segment_size(), configuration_.port_queue_capacity()); @@ -420,8 +413,7 @@ bool SharedMemTransport::send( logWarning(RTPS_MSG_OUT, e.what()); // Segment overflow with discard policy doesn't return error. - if (!shared_buffer && - configuration_.segment_overflow_policy() == SharedMemTransportDescriptor::OverflowPolicy::DISCARD) + if (!shared_buffer) { ret = true; } @@ -474,26 +466,11 @@ bool SharedMemTransport::push_discard( return true; } -bool SharedMemTransport::push_fail( - const std::shared_ptr& buffer, - const Locator_t& remote_locator) -{ - try - { - return find_port(remote_locator.port)->try_push(buffer); - } - catch (const std::exception& error) - { - logWarning(RTPS_MSG_OUT, error.what()); - return false; - } -} - bool SharedMemTransport::send( const std::shared_ptr& buffer, const Locator_t& remote_locator) { - if (!push_lambda_(*this, buffer, remote_locator)) + if (!push_discard(buffer, remote_locator)) { return false; } diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h index 3b8ee3ab622..3f012d5e085 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h @@ -240,19 +240,10 @@ class SharedMemTransport : public TransportInterface std::shared_ptr find_port( uint32_t port_id); - - bool push_fail( - const std::shared_ptr& buffer, - const fastrtps::rtps::Locator_t& remote_locator); - + bool push_discard( const std::shared_ptr& buffer, const fastrtps::rtps::Locator_t& remote_locator); - - std::function& buffer, - const fastrtps::rtps::Locator_t& remote_locator)> push_lambda_; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp index ef9b10be138..24b754c8311 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp @@ -21,10 +21,8 @@ namespace eprosima { namespace fastdds { namespace rtps { -static constexpr uint32_t shm_default_segment_size = 262144; +static constexpr uint32_t shm_default_segment_size = 0; static constexpr uint32_t shm_default_port_queue_capacity = 512; -static constexpr SharedMemTransportDescriptor::OverflowPolicy shm_default_overflow_policy = - SharedMemTransportDescriptor::OverflowPolicy::DISCARD; static constexpr uint32_t shm_default_healthy_check_timeout_ms = 1000; } // rtps @@ -38,12 +36,10 @@ SharedMemTransportDescriptor::SharedMemTransportDescriptor() : TransportDescriptorInterface(shm_default_segment_size, s_maximumInitialPeersRange) , segment_size_(shm_default_segment_size) , port_queue_capacity_(shm_default_port_queue_capacity) - , port_overflow_policy_(shm_default_overflow_policy) - , segment_overflow_policy_(shm_default_overflow_policy) , healthy_check_timeout_ms_(shm_default_healthy_check_timeout_ms) , rtps_dump_file_("") { - maxMessageSize = segment_size_; + maxMessageSize = s_maximumMessageSize; } SharedMemTransportDescriptor::SharedMemTransportDescriptor( @@ -51,12 +47,10 @@ SharedMemTransportDescriptor::SharedMemTransportDescriptor( : TransportDescriptorInterface(t.segment_size_, s_maximumInitialPeersRange) , segment_size_(t.segment_size_) , port_queue_capacity_(t.port_queue_capacity_) - , port_overflow_policy_(t.port_overflow_policy_) - , segment_overflow_policy_(t.segment_overflow_policy_) , healthy_check_timeout_ms_(t.healthy_check_timeout_ms_) , rtps_dump_file_(t.rtps_dump_file_) { - maxMessageSize = t.segment_size_; + maxMessageSize = t.max_message_size(); } #ifdef FASTDDS_SHM_TRANSPORT_DISABLED diff --git a/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.cpp index aa1064fdf28..7b68d6fc327 100644 --- a/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.cpp @@ -26,7 +26,8 @@ test_SharedMemTransportDescriptor::test_SharedMemTransportDescriptor() : SharedMemTransportDescriptor() { big_buffer_size_ = std::numeric_limits::max(); - big_buffer_size_count_ = nullptr; + big_buffer_size_send_count_ = nullptr; + big_buffer_size_recv_count_ = nullptr; } test_SharedMemTransportDescriptor::test_SharedMemTransportDescriptor( @@ -40,7 +41,8 @@ test_SharedMemTransport::test_SharedMemTransport( : SharedMemTransport(t) { big_buffer_size_ = t.big_buffer_size_; - big_buffer_size_count_ = t.big_buffer_size_count_; + big_buffer_size_send_count_ = t.big_buffer_size_send_count_; + big_buffer_size_recv_count_ = t.big_buffer_size_recv_count_; } TransportInterface* test_SharedMemTransportDescriptor::create_transport() const @@ -57,7 +59,7 @@ bool test_SharedMemTransport::send( { if (send_buffer_size >= big_buffer_size_) { - (*big_buffer_size_count_)++; + (*big_buffer_size_send_count_)++; } return SharedMemTransport::send(send_buffer, send_buffer_size, destination_locators_begin, @@ -84,5 +86,5 @@ SharedMemChannelResource* test_SharedMemTransport::CreateInputChannelResource( locator, receiver, big_buffer_size_, - big_buffer_size_count_); + big_buffer_size_recv_count_); } \ No newline at end of file diff --git a/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.h b/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.h index f3a2efdfa8e..2650c96e941 100644 --- a/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.h +++ b/src/cpp/rtps/transport/shared_mem/test_SharedMemTransport.h @@ -44,7 +44,8 @@ class test_SharedMemTransport : public SharedMemTransport private: uint32_t big_buffer_size_; - uint32_t* big_buffer_size_count_; + uint32_t* big_buffer_size_send_count_; + uint32_t* big_buffer_size_recv_count_; }; diff --git a/src/cpp/rtps/transport/shared_mem/test_SharedMemTransportDescriptor.h b/src/cpp/rtps/transport/shared_mem/test_SharedMemTransportDescriptor.h index 04df12d305d..1e1cfe03b8d 100644 --- a/src/cpp/rtps/transport/shared_mem/test_SharedMemTransportDescriptor.h +++ b/src/cpp/rtps/transport/shared_mem/test_SharedMemTransportDescriptor.h @@ -37,7 +37,8 @@ typedef struct test_SharedMemTransportDescriptor : public SharedMemTransportDesc virtual TransportInterface* create_transport() const override; uint32_t big_buffer_size_; - uint32_t* big_buffer_size_count_; + uint32_t* big_buffer_size_send_count_; + uint32_t* big_buffer_size_recv_count_; }test_SharedMemTransportDescriptor; diff --git a/src/cpp/rtps/xmlparser/XMLParser.cpp b/src/cpp/rtps/xmlparser/XMLParser.cpp index a4cb572a04f..3b5d6596d62 100644 --- a/src/cpp/rtps/xmlparser/XMLParser.cpp +++ b/src/cpp/rtps/xmlparser/XMLParser.cpp @@ -398,12 +398,16 @@ XMLP_ret XMLParser::parseXMLTransportData( else { logError(XMLPARSER, "Invalid transport type: '" << sType << "'"); + return XMLP_ret::XML_ERROR; } - ret = parseXMLCommonTransportData(p_root, pDescriptor); - if (ret != XMLP_ret::XML_OK) + if (sType != SHM) { - return ret; + ret = parseXMLCommonTransportData(p_root, pDescriptor); + if (ret != XMLP_ret::XML_OK) + { + return ret; + } } XMLProfileManager::insertTransportById(sId, pDescriptor); @@ -705,10 +709,10 @@ XMLP_ret XMLParser::parseXMLCommonSharedMemTransportData( /* + + - - - + @@ -732,7 +736,7 @@ XMLP_ret XMLParser::parseXMLCommonSharedMemTransportData( { return XMLP_ret::XML_ERROR; } - transport_descriptor->segment_size(static_cast(aux), static_cast(aux)); + transport_descriptor->segment_size(static_cast(aux)); } else if (strcmp(name, PORT_QUEUE_CAPACITY) == 0) { @@ -750,60 +754,36 @@ XMLP_ret XMLParser::parseXMLCommonSharedMemTransportData( } transport_descriptor->healthy_check_timeout_ms(static_cast(aux)); } - else if (strcmp(name, PORT_OVERFLOW_POLICY) == 0) + else if (strcmp(name, RTPS_DUMP_FILE) == 0) { std::string str; if (XMLP_ret::XML_OK != getXMLString(p_aux0, &str, 0)) { return XMLP_ret::XML_ERROR; } - if (str == DISCARD) - { - transport_descriptor->port_overflow_policy( - fastdds::rtps::SharedMemTransportDescriptor::OverflowPolicy::DISCARD); - } - else if (str == FAIL) - { - transport_descriptor->port_overflow_policy( - fastdds::rtps::SharedMemTransportDescriptor::OverflowPolicy::FAIL); - } - else - { - return XMLP_ret::XML_ERROR; - } + transport_descriptor->rtps_dump_file(str); } - else if (strcmp(name, SEGMENT_OVERFLOW_POLICY) == 0) + else if (strcmp(name, MAX_MESSAGE_SIZE) == 0) { - std::string str; - if (XMLP_ret::XML_OK != getXMLString(p_aux0, &str, 0)) - { - return XMLP_ret::XML_ERROR; - } - if (str == DISCARD) - { - transport_descriptor->segment_overflow_policy( - fastdds::rtps::SharedMemTransportDescriptor::OverflowPolicy::DISCARD); - } - else if (str == FAIL) - { - transport_descriptor->segment_overflow_policy( - fastdds::rtps::SharedMemTransportDescriptor::OverflowPolicy::FAIL); - } - else + // maxMessageSize - uint32Type + uint32_t uSize = 0; + if (XMLP_ret::XML_OK != getXMLUint(p_aux0, &uSize, 0)) { return XMLP_ret::XML_ERROR; } + transport_descriptor->max_message_size(uSize); } - else if (strcmp(name, RTPS_DUMP_FILE) == 0) + else if (strcmp(name, MAX_INITIAL_PEERS_RANGE) == 0) { - std::string str; - if (XMLP_ret::XML_OK != getXMLString(p_aux0, &str, 0)) + // maxInitialPeersRange - uint32Type + uint32_t uRange = 0; + if (XMLP_ret::XML_OK != getXMLUint(p_aux0, &uRange, 0)) { return XMLP_ret::XML_ERROR; } - transport_descriptor->rtps_dump_file(str); + transport_descriptor->maxInitialPeersRange = uRange; } - else if (strcmp(name, TRANSPORT_ID) == 0 || strcmp(name, TYPE) == 0 || strcmp(name, MAX_MESSAGE_SIZE) == 0) + else if (strcmp(name, TRANSPORT_ID) == 0 || strcmp(name, TYPE) == 0) { // Parsed Outside of this method } diff --git a/src/cpp/utils/Host.hpp b/src/cpp/utils/Host.hpp index a261bfaf465..990d4c92f97 100644 --- a/src/cpp/utils/Host.hpp +++ b/src/cpp/utils/Host.hpp @@ -50,8 +50,8 @@ class Host } else { - reinterpret_cast(id_)[0] = 127; - reinterpret_cast(id_)[1] = 1; + reinterpret_cast(&id_)[0] = 127; + reinterpret_cast(&id_)[1] = 1; } } } diff --git a/test/blackbox/BlackboxTestsSecurity.cpp b/test/blackbox/BlackboxTestsSecurity.cpp index 3d238477f04..ed357680e59 100644 --- a/test/blackbox/BlackboxTestsSecurity.cpp +++ b/test/blackbox/BlackboxTestsSecurity.cpp @@ -331,7 +331,8 @@ TEST_P(Security, BuiltinAuthenticationAndCryptoPlugin_shm_transport_ok) auto shm_transport = std::make_shared(); auto udp_transport = std::make_shared(); const uint32_t segment_size = 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); reader.disable_builtin_transport(); reader.add_user_transport_to_pparams(shm_transport); writer.disable_builtin_transport(); @@ -402,7 +403,8 @@ TEST_P(Security, BuiltinAuthenticationAndCryptoPlugin_shm_udp_transport_ok) auto shm_transport = std::make_shared(); auto udp_transport = std::make_shared(); const uint32_t segment_size = 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); reader.disable_builtin_transport(); reader.add_user_transport_to_pparams(shm_transport); reader.add_user_transport_to_pparams(udp_transport); diff --git a/test/blackbox/BlackboxTestsTransportSHM.cpp b/test/blackbox/BlackboxTestsTransportSHM.cpp index 9cd2907f9bc..133914b7fd7 100644 --- a/test/blackbox/BlackboxTestsTransportSHM.cpp +++ b/test/blackbox/BlackboxTestsTransportSHM.cpp @@ -80,21 +80,24 @@ TEST(SHM, Test300KFragmentation) auto shm_transport = std::make_shared(); const uint32_t segment_size = static_cast(data_size * 3 / 4); - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); + + uint32_t big_buffers_send_count = 0; + uint32_t big_buffers_recv_count = 0; + shm_transport->big_buffer_size_ = shm_transport->segment_size() / 3; + shm_transport->big_buffer_size_send_count_= &big_buffers_send_count; + shm_transport->big_buffer_size_recv_count_ = &big_buffers_recv_count; writer - .asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE) - .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE) + .reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) .disable_builtin_transport() .add_user_transport_to_pparams(shm_transport) .init(); - uint32_t big_buffers_count = 0; - shm_transport->big_buffer_size_ = shm_transport->segment_size()/3; - shm_transport->big_buffer_size_count_ = &big_buffers_count; - reader - .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) .disable_builtin_transport() .add_user_transport_to_pparams(shm_transport) .init(); @@ -110,12 +113,7 @@ TEST(SHM, Test300KFragmentation) // Send data with some interval, to let async writer thread send samples writer.send(data, 300); - // In this test all data should be sent. - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_all(); - - ASSERT_EQ(big_buffers_count, 2u); + ASSERT_EQ(big_buffers_send_count, 2u); // Destroy the writer participant. writer.destroy(); @@ -134,21 +132,24 @@ TEST(SHM, Test300KNoFragmentation) auto shm_transport = std::make_shared(); const uint32_t segment_size = 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); + + uint32_t big_buffers_send_count = 0; + uint32_t big_buffers_recv_count = 0; + shm_transport->big_buffer_size_ = static_cast(data_size); + shm_transport->big_buffer_size_send_count_ = &big_buffers_send_count; + shm_transport->big_buffer_size_recv_count_ = &big_buffers_recv_count; writer - .asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE) - .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE) + .reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) .disable_builtin_transport() .add_user_transport_to_pparams(shm_transport) - .init(); - - uint32_t big_buffers_count = 0; - shm_transport->big_buffer_size_ = static_cast(data_size); - shm_transport->big_buffer_size_count_ = &big_buffers_count; + .init(); reader - .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) .disable_builtin_transport() .add_user_transport_to_pparams(shm_transport) .init(); @@ -169,7 +170,8 @@ TEST(SHM, Test300KNoFragmentation) // Block reader until reception finished or timeout. reader.block_for_all(); - ASSERT_EQ(big_buffers_count, 1u); + ASSERT_EQ(big_buffers_send_count, 1u); + ASSERT_EQ(big_buffers_recv_count, 1u); // Destroy the writer participant. writer.destroy(); @@ -188,10 +190,17 @@ TEST(SHM, SHM_UDP_300KFragmentation) auto shm_transport = std::make_shared(); const uint32_t segment_size = 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); auto udp_transport = std::make_shared(); + uint32_t big_buffers_send_count = 0; + uint32_t big_buffers_recv_count = 0; + shm_transport->big_buffer_size_ = 32 * 1024; // 32K + shm_transport->big_buffer_size_send_count_ = &big_buffers_send_count; + shm_transport->big_buffer_size_recv_count_ = &big_buffers_recv_count; + writer.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE); writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); writer @@ -200,10 +209,6 @@ TEST(SHM, SHM_UDP_300KFragmentation) .add_user_transport_to_pparams(udp_transport) .init(); - uint32_t big_buffers_count = 0; - shm_transport->big_buffer_size_ = 32*1024; // 32K - shm_transport->big_buffer_size_count_ = &big_buffers_count; - reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); reader .disable_builtin_transport() @@ -229,7 +234,8 @@ TEST(SHM, SHM_UDP_300KFragmentation) // Block reader until reception finished or timeout. reader.block_for_all(); - ASSERT_EQ(big_buffers_count, std::ceil(data_size / (float)udp_transport->maxMessageSize)); + ASSERT_EQ(big_buffers_send_count, std::ceil(data_size / (float)udp_transport->maxMessageSize)); + ASSERT_EQ(big_buffers_recv_count, std::ceil(data_size / (float)udp_transport->maxMessageSize)); // Destroy the writer participant. writer.destroy(); @@ -248,10 +254,17 @@ TEST(SHM, UDPvsSHM_UDP) auto shm_transport = std::make_shared(); const uint32_t segment_size = 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); auto udp_transport = std::make_shared(); + uint32_t big_buffers_send_count = 0; + uint32_t big_buffers_recv_count = 0; + shm_transport->big_buffer_size_ = 32 * 1024; // 32K + shm_transport->big_buffer_size_send_count_ = &big_buffers_send_count; + shm_transport->big_buffer_size_recv_count_ = &big_buffers_recv_count; + writer.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE); writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); writer @@ -259,10 +272,6 @@ TEST(SHM, UDPvsSHM_UDP) .add_user_transport_to_pparams(udp_transport) .init(); - uint32_t big_buffers_count = 0; - shm_transport->big_buffer_size_ = 32*1024; // 32K - shm_transport->big_buffer_size_count_ = &big_buffers_count; - reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); reader .disable_builtin_transport() @@ -287,7 +296,8 @@ TEST(SHM, UDPvsSHM_UDP) // Block reader until reception finished or timeout. reader.block_for_all(); - ASSERT_EQ(big_buffers_count, 0u); + ASSERT_EQ(big_buffers_send_count, 0u); + ASSERT_EQ(big_buffers_recv_count, 0u); // Destroy the writer participant. writer.destroy(); @@ -306,7 +316,8 @@ TEST(SHM, SHM_UDPvsUDP) auto shm_transport = std::make_shared(); const uint32_t segment_size = 1024 * 1024; - shm_transport->segment_size(segment_size, segment_size); + shm_transport->segment_size(segment_size); + shm_transport->max_message_size(segment_size); auto udp_transport = std::make_shared(); diff --git a/test/communication/liveliness_assertion.360.xml b/test/communication/liveliness_assertion.360.xml index aa392000a60..36d0b7da2a4 100644 --- a/test/communication/liveliness_assertion.360.xml +++ b/test/communication/liveliness_assertion.360.xml @@ -4,6 +4,12 @@ FULL + + + disable_shm + UDPv4 + + @@ -13,6 +19,10 @@ + + disable_shm + + false diff --git a/test/communication/liveliness_assertion.xml b/test/communication/liveliness_assertion.xml index 936b0715532..1d948fa353f 100644 --- a/test/communication/liveliness_assertion.xml +++ b/test/communication/liveliness_assertion.xml @@ -4,6 +4,12 @@ FULL + + + disable_shm + UDPv4 + + @@ -16,6 +22,10 @@ + + disable_shm + + false diff --git a/test/mock/rtps/SharedMemTransportDescriptor/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h b/test/mock/rtps/SharedMemTransportDescriptor/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h index 64d25fce1d8..f6d927a1b28 100644 --- a/test/mock/rtps/SharedMemTransportDescriptor/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h +++ b/test/mock/rtps/SharedMemTransportDescriptor/fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h @@ -31,13 +31,12 @@ class TransportInterface; typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterface { virtual ~SharedMemTransportDescriptor() - { - - } - + { + + } RTPS_DllAPI SharedMemTransportDescriptor() - : TransportDescriptorInterface(0, 0) + : TransportDescriptorInterface(0, 0) { } @@ -55,41 +54,27 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac } uint32_t min_send_buffer_size() const override { return 0; } - - enum class OverflowPolicy - { - DISCARD, - FAIL - }; - + RTPS_DllAPI uint32_t segment_size() const { return segment_size_; } - - /** - * Sets the segment_size and the max_message_size. - * max_message must be <= segment_size - * @param [in] segment_size in bytes. - * @param [in] max_message_size in bytes. - */ + RTPS_DllAPI void segment_size( - uint32_t segment_size, - uint32_t max_message_size) + uint32_t segment_size) { segment_size_ = segment_size; - maxMessageSize = std::min(segment_size, max_message_size); } - RTPS_DllAPI OverflowPolicy port_overflow_policy() const - { - return port_overflow_policy_; + virtual uint32_t max_message_size() const override + { + return maxMessageSize; } - RTPS_DllAPI void port_overflow_policy( - OverflowPolicy port_overflow_policy) + RTPS_DllAPI void max_message_size( + uint32_t max_message_size) { - port_overflow_policy_ = port_overflow_policy; + maxMessageSize = max_message_size; } RTPS_DllAPI uint32_t port_queue_capacity() const @@ -103,17 +88,6 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac port_queue_capacity_ = port_queue_capacity; } - RTPS_DllAPI OverflowPolicy segment_overflow_policy() const - { - return segment_overflow_policy_; - } - - RTPS_DllAPI void segment_overflow_policy( - OverflowPolicy segment_overflow_policy) - { - segment_overflow_policy_ = segment_overflow_policy; - } - RTPS_DllAPI uint32_t healthy_check_timeout_ms() const { return healthy_check_timeout_ms_; @@ -140,8 +114,6 @@ typedef struct SharedMemTransportDescriptor : public TransportDescriptorInterfac uint32_t segment_size_; uint32_t port_queue_capacity_; - OverflowPolicy port_overflow_policy_; - OverflowPolicy segment_overflow_policy_; uint32_t healthy_check_timeout_ms_; std::string rtps_dump_file_; diff --git a/test/unittest/transport/SharedMemTests.cpp b/test/unittest/transport/SharedMemTests.cpp index 6e74bb5f3a5..dfe596bfcee 100644 --- a/test/unittest/transport/SharedMemTests.cpp +++ b/test/unittest/transport/SharedMemTests.cpp @@ -193,6 +193,68 @@ TEST_F(SHMRingBuffer, one_listener_reads_all) ASSERT_EQ(listener1->head(), nullptr); } +TEST_F(SHMRingBuffer, copy) +{ + std::unique_ptr > ring_buffer; + + std::unique_ptr::Cell[]> cells + = std::unique_ptr::Cell[]>( + new MultiProducerConsumerRingBuffer::Cell[2]); + + ring_buffer = + std::unique_ptr >(new MultiProducerConsumerRingBuffer( + cells_.get(), 2)); + + auto listener = ring_buffer->register_listener(); + + std::vector enqueued_data; + + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(0u, enqueued_data.size()); + + ring_buffer->push({ 0,0 }); + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(1u, enqueued_data.size()); + enqueued_data.clear(); + + ring_buffer->push({ 0,1 }); + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(2u, enqueued_data.size()); + + ASSERT_EQ(0u, enqueued_data[0]->counter); + ASSERT_EQ(1u, enqueued_data[1]->counter); + + listener->pop(); + + enqueued_data.clear(); + ring_buffer->push({ 0,2 }); + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(2u, enqueued_data.size()); + + ASSERT_EQ(1u, enqueued_data[0]->counter); + ASSERT_EQ(2u, enqueued_data[1]->counter); + + listener->pop(); + + enqueued_data.clear(); + ring_buffer->push({ 0,3 }); + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(2u, enqueued_data.size()); + + ASSERT_EQ(2u, enqueued_data[0]->counter); + ASSERT_EQ(3u, enqueued_data[1]->counter); + + listener->pop(); + enqueued_data.clear(); + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(1u, enqueued_data.size()); + + listener->pop(); + enqueued_data.clear(); + ring_buffer->copy(&enqueued_data); + ASSERT_EQ(0u, enqueued_data.size()); +} + TEST_F(SHMRingBuffer, listeners_register_unregister) { // 0 Must be discarted because no listeners @@ -522,113 +584,12 @@ TEST_F(SHMTransportTests, send_and_receive_between_ports) sender_thread->join(); } -TEST_F(SHMTransportTests, port_and_segment_overflow_fail) -{ - SharedMemTransportDescriptor my_descriptor; - - my_descriptor.port_overflow_policy(SharedMemTransportDescriptor::OverflowPolicy::FAIL); - my_descriptor.segment_overflow_policy(SharedMemTransportDescriptor::OverflowPolicy::FAIL); - my_descriptor.segment_size(16, 16); - my_descriptor.port_queue_capacity(4); - - SharedMemTransport transportUnderTest(my_descriptor); - ASSERT_TRUE(transportUnderTest.init()); - - Locator_t unicastLocator; - unicastLocator.kind = LOCATOR_KIND_SHM; - unicastLocator.port = g_default_port; - - Semaphore sem; - MockReceiverResource receiver(transportUnderTest, unicastLocator); - MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); - - uint32_t messages_received = 0; - std::function recCallback = [&]() - { - // At the second message block - if (messages_received > 0) - { - messages_received++; - sem.wait(); - } - else - { - messages_received++; - } - }; - msg_recv->setCallback(recCallback); - - Locator_t outputChannelLocator; - outputChannelLocator.kind = LOCATOR_KIND_SHM; - outputChannelLocator.port = g_default_port + 1; - - eprosima::fastrtps::rtps::SendResourceList send_resource_list; - ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, outputChannelLocator)); - ASSERT_FALSE(send_resource_list.empty()); - octet message[4] = { 'H','e','l','l'}; - - LocatorList_t locator_list; - locator_list.push_back(unicastLocator); - - { - Locators locators_begin(locator_list.begin()); - Locators locators_end(locator_list.end()); - // Internally the segment is bigger than "my_descriptor.segment_size" so a bigger buffer is tried - // to cause segment overflow - octet message_big[4096] = { 'H','e','l','l'}; - - ASSERT_FALSE(send_resource_list.at(0)->send(message_big, sizeof(message_big), &locators_begin, &locators_end, - (std::chrono::steady_clock::now()+ std::chrono::microseconds(100)))); - } - - // At least 4 msgs of 4 bytes are allowed - for (int i=0; i<4; i++) - { - Locators locators_begin(locator_list.begin()); - Locators locators_end(locator_list.end()); - - // At least 4 msgs of 4 bytes are allowed - ASSERT_TRUE(send_resource_list.at(0)->send(message, sizeof(message), &locators_begin, &locators_end, - (std::chrono::steady_clock::now() + std::chrono::microseconds(100)))); - } - - // Wait until the receiver get the first message - while (messages_received == 0) - { - std::this_thread::yield(); - } - - // The receiver has poped a message so now 3 messages are in the - // port's queue - - // Push a 4th should go well - { - Locators locators_begin(locator_list.begin()); - Locators locators_end(locator_list.end()); - - ASSERT_TRUE(send_resource_list.at(0)->send(message, sizeof(message), &locators_begin, &locators_end, - (std::chrono::steady_clock::now()+ std::chrono::microseconds(100)))); - } - - // Push a 5th will cause port overflow - { - Locators locators_begin(locator_list.begin()); - Locators locators_end(locator_list.end()); - - ASSERT_FALSE(send_resource_list.at(0)->send(message, sizeof(message), &locators_begin, &locators_end, - (std::chrono::steady_clock::now()+ std::chrono::microseconds(100)))); - } - - sem.disable(); -} - TEST_F(SHMTransportTests, port_and_segment_overflow_discard) { SharedMemTransportDescriptor my_descriptor; - my_descriptor.port_overflow_policy(SharedMemTransportDescriptor::OverflowPolicy::DISCARD); - my_descriptor.segment_overflow_policy(SharedMemTransportDescriptor::OverflowPolicy::DISCARD); - my_descriptor.segment_size(16, 16); + my_descriptor.segment_size(16); + my_descriptor.max_message_size(16); my_descriptor.port_queue_capacity(4); SharedMemTransport transportUnderTest(my_descriptor); @@ -716,14 +677,15 @@ TEST_F(SHMTransportTests, port_and_segment_overflow_discard) TEST_F(SHMTransportTests, port_mutex_deadlock_recover) { - const std::string domain_name("SHMTransportTests"); + const std::string domain_name("SHMTests"); - SharedMemGlobal shared_mem_global(domain_name); + SharedMemManager shared_mem_manager(domain_name); + SharedMemGlobal* shared_mem_global = shared_mem_manager.global_segment(); MockPortSharedMemGlobal port_mocker; port_mocker.remove_port_mutex(domain_name, 0); - auto global_port = shared_mem_global.open_port(0, 1, 1000); + auto global_port = shared_mem_global->open_port(0, 1, 1000); Semaphore sem_lock_done; Semaphore sem_end_thread_locker; @@ -743,22 +705,204 @@ TEST_F(SHMTransportTests, port_mutex_deadlock_recover) auto port_mutex = port_mocker.get_port_mutex(domain_name, 0); ASSERT_FALSE(port_mutex->try_lock()); - auto global_port2 = shared_mem_global.open_port(0, 1, 1000); + auto global_port2 = shared_mem_global->open_port(0, 1, 1000); - ASSERT_NO_THROW(global_port2->healthy_check(1000)); + ASSERT_NO_THROW(global_port2->healthy_check()); sem_end_thread_locker.post(); thread_locker.join(); } +TEST_F(SHMTransportTests, port_listener_dead_recover) +{ + const std::string domain_name("SHMTests"); + + SharedMemManager shared_mem_manager(domain_name); + SharedMemGlobal* shared_mem_global = shared_mem_manager.global_segment(); + + uint32_t listener1_index; + auto port1 = shared_mem_global->open_port(0, 1, 1000); + auto listener1 = port1->create_listener(&listener1_index); + + auto listener2 = shared_mem_manager.open_port(0, 1, 1000)->create_listener(); + + std::atomic thread_listener2_state(0); + std::thread thread_listener2([&] + { + // lock has to be done in another thread because + // boost::inteprocess_named_mutex and interprocess_mutex are recursive in Win32 + auto buf = listener2->pop(); + ASSERT_TRUE(*static_cast(buf->data()) == 1); + + thread_listener2_state = 1; + + buf = listener2->pop(); + // The pop is broken by port regeneration + ASSERT_TRUE(buf == nullptr); + + thread_listener2_state = 2; + + buf = listener2->pop(); + // 2 is received in the new regenerated port + ASSERT_TRUE(*static_cast(buf->data()) == 2); + + thread_listener2_state = 3; + } + ); + + auto port_sender = shared_mem_manager.open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::Write); + auto segment = shared_mem_manager.create_segment(1024, 16); + auto buf = segment->alloc_buffer(1, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + ASSERT_TRUE(buf != nullptr); + memset(buf->data(), 0, buf->size()); + *static_cast(buf->data()) = 1u; + ASSERT_TRUE(port_sender->try_push(buf)); + + // Wait until message received + while (thread_listener2_state.load() < 1u) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + MockPortSharedMemGlobal port_mocker; + std::atomic_bool is_listener1_closed(false); + std::thread thread_listener1([&] + { + // Deadlock the listener. + port_mocker.wait_pop_deadlock(*port1, *listener1, is_listener1_closed, listener1_index); + } + ); + + // Wait until port is regenerated + while (thread_listener2_state.load() < 2u) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + *static_cast(buf->data()) = 2u; + // This push must fail because port is not OK + ASSERT_FALSE(port_sender->try_push(buf)); + + // This push must success because port was regenerated in the last try_push call. + ASSERT_TRUE(port_sender->try_push(buf)); + + // Wait until port is regenerated + while (thread_listener2_state.load() < 3u) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + thread_listener2.join(); + + // Unblocks thread_listener1 + port_mocker.unblock_wait_pop(*port1, is_listener1_closed); + + thread_listener1.join(); +} + +TEST_F(SHMTransportTests, on_port_failure_free_enqueued_descriptors) +{ + const std::string domain_name("SHMTests"); + + SharedMemManager shared_mem_manager(domain_name); + SharedMemGlobal* shared_mem_global = shared_mem_manager.global_segment(); + + uint32_t listener1_index; + auto port1 = shared_mem_global->open_port(0, 4, 1000); + auto listener1 = port1->create_listener(&listener1_index); + + auto listener2 = shared_mem_manager.open_port(0, 1, 1000)->create_listener(); + + auto segment = shared_mem_manager.create_segment(16, 4); + std::vector > buffers; + + // Alloc 4 buffers x 4 bytes + for (int i=0; i<4; i++) + { + buffers.push_back(segment->alloc_buffer(4, std::chrono::steady_clock::time_point())); + ASSERT_FALSE(nullptr == buffers.back()); + memset(buffers.back()->data(), 0, buffers.back()->size()); + *static_cast(buffers.back()->data()) = static_cast(i+1); + } + + // Not enough space for more allocations + ASSERT_THROW(buffers.push_back(segment->alloc_buffer(4, std::chrono::steady_clock::time_point())), std::exception); + + auto port_sender = shared_mem_manager.open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::Write); + + // Enqueued all buffers in the port + for (auto& buffer : buffers) + { + ASSERT_TRUE(port_sender->try_push(buffer)); + } + + buffers.clear(); + + // Not enough space for more allocations + ASSERT_THROW(buffers.push_back(segment->alloc_buffer(4, std::chrono::steady_clock::time_point())), std::exception); + + std::atomic thread_listener2_state(0); + std::thread thread_listener2([&] + { + // Read all the buffers + for (int i = 0; i < 4; i++) + { + // Pops the first buffer + auto buf = listener2->pop(); + ASSERT_TRUE(*static_cast(buf->data()) == static_cast(i+1)); + } + + thread_listener2_state = 1; + + // The pop is broken by port regeneration + auto buf_null = listener2->pop(); + ASSERT_TRUE(buf_null == nullptr); + } + ); + + // Wait until messages are popped + while (thread_listener2_state.load() < 1u) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + MockPortSharedMemGlobal port_mocker; + std::atomic_bool is_listener1_closed(false); + std::thread thread_listener1([&] + { + // Deadlock the listener. + port_mocker.wait_pop_deadlock(*port1, *listener1, is_listener1_closed, listener1_index); + }); + + // Wait until port is regenerated + thread_listener2.join(); + + // Port regeneration must have freed enqueued descriptors + // so allocation now should be possible again + // Alloc 4 buffers x 4 bytes + for (int i=0; i<4; i++) + { + buffers.push_back(segment->alloc_buffer(4, std::chrono::steady_clock::time_point())); + ASSERT_FALSE(nullptr == buffers.back()); + memset(buffers.back()->data(), 0, buffers.back()->size()); + *static_cast(buffers.back()->data()) = static_cast(i+1); + } + + // Unblocks thread_listener1 + port_mocker.unblock_wait_pop(*port1, is_listener1_closed); + + thread_listener1.join(); +} + TEST_F(SHMTransportTests, empty_cv_mutex_deadlocked_try_push) { - const std::string domain_name("SHMTransportTests"); + const std::string domain_name("SHMTests"); - SharedMemGlobal shared_mem_global(domain_name); + SharedMemManager shared_mem_manager(domain_name); + SharedMemGlobal* shared_mem_global = shared_mem_manager.global_segment(); MockPortSharedMemGlobal port_mocker; - auto global_port = shared_mem_global.open_port(0, 1, 1000); + auto global_port = shared_mem_global->open_port(0, 1, 1000); Semaphore sem_lock_done; Semaphore sem_end_thread_locker; @@ -780,7 +924,7 @@ TEST_F(SHMTransportTests, empty_cv_mutex_deadlocked_try_push) SharedMemGlobal::BufferDescriptor foo; ASSERT_THROW(global_port->try_push(foo, &listerner_active), std::exception); - ASSERT_THROW(global_port->healthy_check(1000), std::exception); + ASSERT_THROW(global_port->healthy_check(), std::exception); sem_end_thread_locker.post(); thread_locker.join(); @@ -788,18 +932,22 @@ TEST_F(SHMTransportTests, empty_cv_mutex_deadlocked_try_push) TEST_F(SHMTransportTests, dead_listener_port_recover) { - const std::string domain_name("SHMTransportTests"); + const std::string domain_name("SHMTests"); - SharedMemGlobal shared_mem_global(domain_name); - auto deadlocked_port = shared_mem_global.open_port(0, 1, 1000); - auto deadlocked_listener = deadlocked_port->create_listener(); + SharedMemManager shared_mem_manager(domain_name); + SharedMemGlobal* shared_mem_global = shared_mem_manager.global_segment(); + + auto deadlocked_port = shared_mem_global->open_port(0, 1, 1000); + uint32_t listener_index; + auto deadlocked_listener = deadlocked_port->create_listener(&listener_index); // Simulates a deadlocked wait_pop std::atomic_bool is_listener_closed(false); std::thread thread_wait_deadlock([&] { MockPortSharedMemGlobal port_mocker; - port_mocker.wait_pop_deadlock(*deadlocked_port, *deadlocked_listener, is_listener_closed); + port_mocker.wait_pop_deadlock(*deadlocked_port, *deadlocked_listener, + is_listener_closed, listener_index); (void)port_mocker; // Removes an inexplicable warning when compiling with VC(v140 toolset) }); @@ -807,8 +955,8 @@ TEST_F(SHMTransportTests, dead_listener_port_recover) std::this_thread::sleep_for(std::chrono::seconds(1)); // Open the deadlocked port - auto port = shared_mem_global.open_port(0, 1, 1000); - auto listener = port->create_listener(); + auto port = shared_mem_global->open_port(0, 1, 1000); + auto listener = port->create_listener(&listener_index); bool listerners_active; SharedMemSegment::Id random_id; random_id.generate(); diff --git a/test/unittest/transport/mock/SharedMemGlobalMock.hpp b/test/unittest/transport/mock/SharedMemGlobalMock.hpp index cb5fbe29e89..4ed122ac5a5 100644 --- a/test/unittest/transport/mock/SharedMemGlobalMock.hpp +++ b/test/unittest/transport/mock/SharedMemGlobalMock.hpp @@ -54,20 +54,32 @@ class MockPortSharedMemGlobal static void wait_pop_deadlock( SharedMemGlobal::Port& port, SharedMemGlobal::Listener& listener, - const std::atomic& is_listener_closed) + const std::atomic& is_listener_closed, + uint32_t listener_index) { (void)listener; std::unique_lock lock(port.node_->empty_cv_mutex); port.node_->waiting_count++; + auto& status = port.node_->listeners_status[listener_index]; + status.is_waiting = 1; port.node_->empty_cv.wait(lock, [&] { return is_listener_closed.load(); }); + status.is_waiting = 0; port.node_->waiting_count--; } + + static void unblock_wait_pop( + SharedMemGlobal::Port& port, + std::atomic& is_listener_closed) + { + is_listener_closed.exchange(true); + port.node_->empty_cv.notify_all(); + } }; } // namespace rtps diff --git a/test/unittest/xmlparser/SHM_transport_descriptors_config.xml b/test/unittest/xmlparser/SHM_transport_descriptors_config.xml index 391bfba9d8b..c73fda0779d 100644 --- a/test/unittest/xmlparser/SHM_transport_descriptors_config.xml +++ b/test/unittest/xmlparser/SHM_transport_descriptors_config.xml @@ -7,8 +7,6 @@ SHM 4294967295 4294967295 - DISCARD - FAIL 4294967295 test_file.dump 128000 diff --git a/test/unittest/xmlparser/XMLProfileParserTests.cpp b/test/unittest/xmlparser/XMLProfileParserTests.cpp index 73605a4d40a..7e4edcbdb8d 100644 --- a/test/unittest/xmlparser/XMLProfileParserTests.cpp +++ b/test/unittest/xmlparser/XMLProfileParserTests.cpp @@ -785,10 +785,6 @@ TEST_F(XMLProfileParserTests, SHM_transport_descriptors_config) ASSERT_NE(descriptor, nullptr); ASSERT_EQ(descriptor->segment_size(), std::numeric_limits::max()); ASSERT_EQ(descriptor->port_queue_capacity(), std::numeric_limits::max()); - ASSERT_EQ(descriptor->port_overflow_policy(), - eprosima::fastdds::rtps::SharedMemTransportDescriptor::OverflowPolicy::DISCARD); - ASSERT_EQ(descriptor->segment_overflow_policy(), - eprosima::fastdds::rtps::SharedMemTransportDescriptor::OverflowPolicy::FAIL); ASSERT_EQ(descriptor->healthy_check_timeout_ms(), std::numeric_limits::max()); ASSERT_EQ(descriptor->rtps_dump_file(), "test_file.dump"); ASSERT_EQ(descriptor->maxMessageSize, 128000u);