diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index a63523e2c..5c96925c1 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -14,7 +14,7 @@ from enum import Enum from enum import IntEnum -from typing import Union +from typing import TypedDict, Union import warnings @@ -54,6 +54,18 @@ def __init__(self, *args): Exception.__init__(self, 'Invalid QoSProfile', *args) +class QoSProfileDictionary(TypedDict): + history: 'QoSHistoryPolicy' + depth: int + reliability: 'QoSReliabilityPolicy' + durability: 'QoSDurabilityPolicy' + lifespan: Duration + deadline: Duration + liveliness: 'QoSLivelinessPolicy' + liveliness_lease_duration: Duration + avoid_ros_namespace_conventions: bool + + class QoSProfile: """Define Quality of Service policies.""" diff --git a/rclpy/rclpy/topic_endpoint_info.py b/rclpy/rclpy/topic_endpoint_info.py index 110080a5b..40301ef91 100644 --- a/rclpy/rclpy/topic_endpoint_info.py +++ b/rclpy/rclpy/topic_endpoint_info.py @@ -13,9 +13,10 @@ # limitations under the License. from enum import IntEnum +from typing import List, Union -from rclpy.qos import QoSHistoryPolicy, QoSPresetProfiles, QoSProfile -from rclpy.type_hash import TypeHash +from rclpy.qos import QoSHistoryPolicy, QoSPresetProfiles, QoSProfile, QoSProfileDictionary +from rclpy.type_hash import TypeHash, TypeHashDictionary class TopicEndpointTypeEnum(IntEnum): @@ -43,75 +44,72 @@ class TopicEndpointInfo: '_qos_profile' ] - def __init__(self, **kwargs): - assert all('_' + key in self.__slots__ for key in kwargs.keys()), \ - 'Invalid arguments passed to constructor: %r' % kwargs.keys() - - self.node_name = kwargs.get('node_name', '') - self.node_namespace = kwargs.get('node_namespace', '') - self.topic_type = kwargs.get('topic_type', '') - self.topic_type_hash = kwargs.get('topic_type_hash', TypeHash()) - self.endpoint_type = kwargs.get('endpoint_type', TopicEndpointTypeEnum.INVALID) - self.endpoint_gid = kwargs.get('endpoint_gid', []) - self.qos_profile = kwargs.get('qos_profile', QoSPresetProfiles.UNKNOWN.value) + def __init__(self, node_name: str = '', node_namespace: str = '', + topic_type: str = '', topic_type_hash: TypeHash = TypeHash(), + endpoint_type: TopicEndpointTypeEnum = TopicEndpointTypeEnum.INVALID, + endpoint_gid: List[int] = [], + qos_profile: QoSProfile = QoSPresetProfiles.UNKNOWN.value) -> None: + self.node_name = node_name + self.node_namespace = node_namespace + self.topic_type = topic_type + self.topic_type_hash = topic_type_hash + self.endpoint_type = endpoint_type + self.endpoint_gid = endpoint_gid + self.qos_profile = qos_profile @property - def node_name(self): + def node_name(self) -> str: """ Get field 'node_name'. :returns: node_name attribute - :rtype: str """ return self._node_name @node_name.setter - def node_name(self, value): + def node_name(self, value: str) -> None: assert isinstance(value, str) self._node_name = value @property - def node_namespace(self): + def node_namespace(self) -> str: """ Get field 'node_namespace'. :returns: node_namespace attribute - :rtype: str """ return self._node_namespace @node_namespace.setter - def node_namespace(self, value): + def node_namespace(self, value: str) -> None: assert isinstance(value, str) self._node_namespace = value @property - def topic_type(self): + def topic_type(self) -> str: """ Get field 'topic_type'. :returns: topic_type attribute - :rtype: str """ return self._topic_type @topic_type.setter - def topic_type(self, value): + def topic_type(self, value: str) -> None: assert isinstance(value, str) self._topic_type = value @property - def topic_type_hash(self): + def topic_type_hash(self) -> TypeHash: """ Get field 'topic_type_hash'. :returns: topic_type_hash attribute - :rtype: TypeHash """ return self._topic_type_hash @topic_type_hash.setter - def topic_type_hash(self, value): + def topic_type_hash(self, value: Union[TypeHash, TypeHashDictionary]) -> None: if isinstance(value, TypeHash): self._topic_type_hash = value elif isinstance(value, dict): @@ -120,17 +118,16 @@ def topic_type_hash(self, value): assert False @property - def endpoint_type(self): + def endpoint_type(self) -> TopicEndpointTypeEnum: """ Get field 'endpoint_type'. :returns: endpoint_type attribute - :rtype: TopicEndpointTypeEnum """ return self._endpoint_type @endpoint_type.setter - def endpoint_type(self, value): + def endpoint_type(self, value: Union[TopicEndpointTypeEnum, int]) -> None: if isinstance(value, TopicEndpointTypeEnum): self._endpoint_type = value elif isinstance(value, int): @@ -139,32 +136,30 @@ def endpoint_type(self, value): assert False @property - def endpoint_gid(self): + def endpoint_gid(self) -> List[int]: """ Get field 'endpoint_gid'. :returns: endpoint_gid attribute - :rtype: list """ return self._endpoint_gid @endpoint_gid.setter - def endpoint_gid(self, value): + def endpoint_gid(self, value: List[int]) -> None: assert all(isinstance(x, int) for x in value) self._endpoint_gid = value @property - def qos_profile(self): + def qos_profile(self) -> QoSProfile: """ Get field 'qos_profile'. :returns: qos_profile attribute - :rtype: QoSProfile """ return self._qos_profile @qos_profile.setter - def qos_profile(self, value): + def qos_profile(self, value: Union[QoSProfile, QoSProfileDictionary]) -> None: if isinstance(value, QoSProfile): self._qos_profile = value elif isinstance(value, dict): @@ -172,14 +167,14 @@ def qos_profile(self, value): else: assert False - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, TopicEndpointInfo): return False return all( self.__getattribute__(slot) == other.__getattribute__(slot) for slot in self.__slots__) - def __str__(self): + def __str__(self) -> str: gid = '.'.join(format(x, '02x') for x in self.endpoint_gid) if self.qos_profile.history.value != QoSHistoryPolicy.KEEP_LAST: history_depth_str = self.qos_profile.history.name diff --git a/rclpy/rclpy/type_hash.py b/rclpy/rclpy/type_hash.py index ae7daf12f..33f958c43 100644 --- a/rclpy/rclpy/type_hash.py +++ b/rclpy/rclpy/type_hash.py @@ -12,6 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TypedDict + + +class TypeHashDictionary(TypedDict): + version: int + value: bytes + class TypeHash: """Type hash."""