Skip to content

Commit

Permalink
Updatable disable_positive_acks period (#3879)
Browse files Browse the repository at this point in the history
* Refs #19576: Test Update positive_acks period on RTPS Layer

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19567: Fix Update positive_acks period on RTPS Layer

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19576: Test Update positive_acks period on DDS Layer

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19576: Fix ack_timer and Updatability of positive_acks on DDS Layer

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19576: Uncrustify fix

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19576: Fix linux ci

Signed-off-by: cferreiragonz <[email protected]>

* Ref #19576: DataReaderImpl update and updateAttributes call

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19576: Fix mac-ci

Signed-off-by: cferreiragonz <[email protected]>

* Refs #19576: move if clause

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz authored Oct 2, 2023
1 parent 5974eb5 commit b84825a
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 14 deletions.
1 change: 1 addition & 0 deletions include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2264,6 +2264,7 @@ class DisablePositiveACKsQosPolicy : public Parameter_t, public QosPolicy
const DisablePositiveACKsQosPolicy& b) const
{
return enabled == b.enabled &&
duration == b.duration &&
Parameter_t::operator ==(b) &&
QosPolicy::operator ==(b);
}
Expand Down
3 changes: 2 additions & 1 deletion include/fastdds/dds/publisher/qos/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class RTPSReliableWriterQos
const RTPSReliableWriterQos& b) const
{
return (this->times == b.times) &&
(this->disable_positive_acks == b.disable_positive_acks);
(this->disable_positive_acks == b.disable_positive_acks) &&
(this->disable_heartbeat_piggyback == b.disable_heartbeat_piggyback);
}

//!Writer Timing Attributes
Expand Down
7 changes: 7 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ class StatefulWriter : public RTPSWriter
void updateTimes(
const WriterTimes& times);

/**
* Update the period of the disable positive ACKs policy.
* @param att WriterAttributes parameter.
*/
void updatePositiveAcks(
const WriterAttributes& att);

SequenceNumber_t next_sequence_number() const;

/**
Expand Down
21 changes: 20 additions & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1143,10 +1143,22 @@ ReturnCode_t DataWriterImpl::set_qos(
{
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
}
set_qos(qos_, qos_to_set, !enabled);

set_qos(qos_, qos_to_set, enabled);

if (enabled)
{
if (qos_.reliability().kind == eprosima::fastrtps::RELIABLE_RELIABILITY_QOS &&
qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos())
{
// Update times and positive_acks attributes on RTPS Layer
WriterAttributes w_att;
w_att.times = qos_.reliable_writer_qos().times;
w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled;
w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
writer_->updateAttributes(w_att);
}

//Notify the participant that a Writer has changed its QOS
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
Expand Down Expand Up @@ -1884,6 +1896,13 @@ bool DataWriterImpl::can_qos_be_updated(
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Data sharing configuration cannot be changed after the creation of a DataWriter.");
}
if (to.reliable_writer_qos().disable_positive_acks.enabled !=
from.reliable_writer_qos().disable_positive_acks.enabled)
{
updatable = false;
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Only the period of Positive ACKs can be changed after the creation of a DataWriter.");
}
return updatable;
}

Expand Down
7 changes: 7 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,13 @@ bool DataReaderImpl::can_qos_be_updated(
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Unique network flows request cannot be changed after the creation of a DataReader.");
}
if (to.reliable_reader_qos().disable_positive_ACKs.enabled !=
from.reliable_reader_qos().disable_positive_ACKs.enabled)
{
updatable = false;
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"Positive ACKs QoS cannot be changed after the creation of a DataReader.");
}
return updatable;
}

Expand Down
51 changes: 47 additions & 4 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace rtps {
/**
* Loops over all the readers in the vector, applying the given routine.
* The loop continues until the result of the routine is true for any reader
* or all readers have been processes.
* or all readers have been processed.
* The returned value is true if the routine returned true at any point,
* or false otherwise.
*/
Expand Down Expand Up @@ -953,6 +953,17 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t())
{
last_sequence_number_ = change->sequenceNumber;
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->restart_timer(max_blocking_time);
}
}

// Restore in case a exception was launched by RTPSMessageGroup.
Expand Down Expand Up @@ -1612,6 +1623,24 @@ void StatefulWriter::updateAttributes(
const WriterAttributes& att)
{
this->updateTimes(att.times);
if (this->get_disable_positive_acks())
{
this->updatePositiveAcks(att);
}
}

void StatefulWriter::updatePositiveAcks(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->restart_timer();
}

void StatefulWriter::updateTimes(
Expand Down Expand Up @@ -2030,26 +2059,40 @@ bool StatefulWriter::ack_timer_expired()

while (interval.count() < 0)
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
[this](ReaderProxy* reader)
[this, &acks_flag](ReaderProxy* reader)
{
if (reader->disable_positive_acks())
{
reader->acked_changes_set(last_sequence_number_ + 1);
acks_flag = true;
}
return false;
}
);
last_sequence_number_++;
if (acks_flag)
{
check_acked_status();
}

// Get the next cache change from the history
CacheChange_t* change;

// Skip removed changes until reaching the last change
do
{
last_sequence_number_++;
} while (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change) && last_sequence_number_ < next_sequence_number());

if (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change))
{
// Stop ack_timer
return false;
}

Expand Down
11 changes: 11 additions & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,17 @@ class PubSubWriter
return (ReturnCode_t::RETCODE_OK == datawriter_->set_qos(datawriter_qos_));
}

bool set_qos(
const eprosima::fastdds::dds::DataWriterQos& att)
{
return (ReturnCode_t::RETCODE_OK == datawriter_->set_qos(att));
}

eprosima::fastdds::dds::DataWriterQos get_qos()
{
return (datawriter_->get_qos());
}

bool remove_all_changes(
size_t* number_of_changes_removed)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,23 +12,138 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "BlackboxTests.hpp"

#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"
#include "ReqRepAsReliableHelloWorldRequester.hpp"
#include "ReqRepAsReliableHelloWorldReplier.hpp"
#include <atomic>
#include <condition_variable>
#include <gmock/gmock-matchers.h>
#include <mutex>
#include <thread>

#include <gtest/gtest.h>

#include <gmock/gmock.h>

#include <fastdds/core/policy/ParameterSerializer.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantFactoryQos.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/qos/SubscriberQos.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/topic/qos/TopicQos.hpp>
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.h>
#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/transport/test_UDPv4TransportDescriptor.h>
#include <rtps/transport/test_UDPv4Transport.h>
#include <fastrtps/types/TypesBase.h>

#include "BlackboxTests.hpp"
#include "../api/dds-pim/CustomPayloadPool.hpp"
#include "../api/dds-pim/PubSubReader.hpp"
#include "../api/dds-pim/PubSubWriter.hpp"
#include "../api/dds-pim/ReqRepAsReliableHelloWorldRequester.hpp"
#include "../api/dds-pim/ReqRepAsReliableHelloWorldReplier.hpp"
#include "../types/FixedSized.h"
#include "../types/FixedSizedPubSubTypes.h"
#include "../types/HelloWorldPubSubTypes.h"

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport;
using test_UDPv4TransportDescriptor = eprosima::fastdds::rtps::test_UDPv4TransportDescriptor;


TEST(AcknackQos, DDSEnableUpdatabilityOfPositiveAcksPeriodDDSLayer)
{
// This test checks the behaviour of disabling positive ACKs.
// It also checks that only the positive ACKs
// period is updatable at runtime through set_qos.

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

// Configure datapublisher_qos
writer.keep_duration({1, 0});
writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
writer.durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS);
writer.init();

ASSERT_TRUE(writer.isInitialized());

// Configure datasubscriber_qos
reader.keep_duration({1, 0});
reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
reader.init();

ASSERT_TRUE(reader.isInitialized());

// Check correct initialitation
eprosima::fastdds::dds::DataWriterQos get_att = writer.get_qos();
EXPECT_TRUE(get_att.reliable_writer_qos().disable_positive_acks.enabled);
EXPECT_EQ(get_att.reliable_writer_qos().disable_positive_acks.duration, eprosima::fastrtps::Duration_t({1, 0}));

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator();

reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();
// Wait for all acked msgs
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));

// Wait to disable timer because no new messages are sent
std::this_thread::sleep_for(std::chrono::milliseconds(1200));
// Send a new message to check that timer is restarted correctly
data = default_helloworld_data_generator(1);
reader.startReception(data);
writer.send(data);
ASSERT_TRUE(data.empty());
reader.block_for_all();
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));

// Update attributes on DDS layer
eprosima::fastdds::dds::DataWriterQos w_att = writer.get_qos();
w_att.reliable_writer_qos().disable_positive_acks.enabled = true;
w_att.reliable_writer_qos().disable_positive_acks.duration = eprosima::fastrtps::Duration_t({2, 0});

EXPECT_TRUE(writer.set_qos(w_att));

// Check that period has been changed in DataWriterQos
get_att = writer.get_qos();
EXPECT_TRUE(get_att.reliable_writer_qos().disable_positive_acks.enabled);
EXPECT_EQ(get_att.reliable_writer_qos().disable_positive_acks.duration, eprosima::fastrtps::Duration_t({2, 0}));

data = default_helloworld_data_generator();

reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();
// Check that period has been correctly updated
EXPECT_FALSE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200)));

// Try to disable positive_acks
w_att.reliable_writer_qos().disable_positive_acks.enabled = false;

// Check that is not possible to change disable_positive_acks at runtime
EXPECT_FALSE(writer.set_qos(w_att));
}

TEST(AcknackQos, RecoverAfterLosingCommunicationWithDisablePositiveAck)
{
// This test makes the writer send a few samples
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/common/RTPSAsSocketReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ class RTPSAsSocketReader
}
}

RTPSAsSocketReader& disable_positive_acks(
bool disable)
{
reader_attr_.disable_positive_acks = disable;
return *this;
}

private:

void receive_one(
Expand Down
Loading

0 comments on commit b84825a

Please sign in to comment.