diff --git a/rclpy/rclpy/qos_overriding_options.py b/rclpy/rclpy/qos_overriding_options.py index 1db48cfd9..316b59433 100644 --- a/rclpy/rclpy/qos_overriding_options.py +++ b/rclpy/rclpy/qos_overriding_options.py @@ -162,7 +162,7 @@ def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[s if policy in ( QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION ): - value = value.nanoseconds() + value = value.nanoseconds return value diff --git a/rclpy/test/test_qos_overriding_options.py b/rclpy/test/test_qos_overriding_options.py index 933dc6be7..d55b3ae2b 100644 --- a/rclpy/test/test_qos_overriding_options.py +++ b/rclpy/test/test_qos_overriding_options.py @@ -15,14 +15,21 @@ import pytest import rclpy +from rclpy.duration import Duration from rclpy.node import Node from rclpy.parameter import Parameter from rclpy.publisher import Publisher +from rclpy.qos import QoSDurabilityPolicy +from rclpy.qos import QoSHistoryPolicy +from rclpy.qos import QoSLivelinessPolicy from rclpy.qos import QoSProfile +from rclpy.qos import QoSReliabilityPolicy from rclpy.qos_overriding_options import _declare_qos_parameters +from rclpy.qos_overriding_options import _get_qos_policy_parameter from rclpy.qos_overriding_options import InvalidQosOverridesError from rclpy.qos_overriding_options import QosCallbackResult from rclpy.qos_overriding_options import QoSOverridingOptions +from rclpy.qos_overriding_options import QoSPolicyKind @pytest.fixture(autouse=True) @@ -32,6 +39,35 @@ def init_shutdown(): rclpy.shutdown() +def test_get_qos_policy_parameter(): + qos = QoSProfile( + history=QoSHistoryPolicy.KEEP_LAST, + depth=10, + reliability=QoSReliabilityPolicy.RELIABLE, + durability=QoSDurabilityPolicy.VOLATILE, + lifespan=Duration(nanoseconds=1e3), + deadline=Duration(nanoseconds=1e6), + liveliness=QoSLivelinessPolicy.SYSTEM_DEFAULT, + liveliness_lease_duration=Duration(nanoseconds=1e9) + ) + value = _get_qos_policy_parameter(qos, QoSPolicyKind.HISTORY) + assert value == 'keep_last' + value = _get_qos_policy_parameter(qos, QoSPolicyKind.DEPTH) + assert value == qos.depth + value = _get_qos_policy_parameter(qos, QoSPolicyKind.RELIABILITY) + assert value == 'reliable' + value = _get_qos_policy_parameter(qos, QoSPolicyKind.DURABILITY) + assert value == 'volatile' + value = _get_qos_policy_parameter(qos, QoSPolicyKind.LIFESPAN) + assert value == qos.lifespan.nanoseconds + value = _get_qos_policy_parameter(qos, QoSPolicyKind.DEADLINE) + assert value == qos.deadline.nanoseconds + value = _get_qos_policy_parameter(qos, QoSPolicyKind.LIVELINESS) + assert value == 'system_default' + value = _get_qos_policy_parameter(qos, QoSPolicyKind.LIVELINESS_LEASE_DURATION) + assert value == qos.liveliness_lease_duration.nanoseconds + + def test_declare_qos_parameters(): node = Node('my_node') _declare_qos_parameters(