Skip to content

Commit

Permalink
Add types to topic_endpoint_info.oy (#1253)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <[email protected]>
Co-authored-by: Shane Loretz <[email protected]>
  • Loading branch information
InvincibleRMC and sloretz authored Aug 3, 2024
1 parent db98d90 commit 35c0e7e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
14 changes: 13 additions & 1 deletion rclpy/rclpy/qos.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from enum import Enum
from enum import IntEnum
from typing import Union
from typing import TypedDict, Union

import warnings

Expand Down Expand Up @@ -54,6 +54,18 @@ def __init__(self, *args):
Exception.__init__(self, 'Invalid QoSProfile', *args)


class QoSProfileDictionary(TypedDict):
history: 'QoSHistoryPolicy'
depth: int
reliability: 'QoSReliabilityPolicy'
durability: 'QoSDurabilityPolicy'
lifespan: Duration
deadline: Duration
liveliness: 'QoSLivelinessPolicy'
liveliness_lease_duration: Duration
avoid_ros_namespace_conventions: bool


class QoSProfile:
"""Define Quality of Service policies."""

Expand Down
67 changes: 31 additions & 36 deletions rclpy/rclpy/topic_endpoint_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

from enum import IntEnum
from typing import List, Union

from rclpy.qos import QoSHistoryPolicy, QoSPresetProfiles, QoSProfile
from rclpy.type_hash import TypeHash
from rclpy.qos import QoSHistoryPolicy, QoSPresetProfiles, QoSProfile, QoSProfileDictionary
from rclpy.type_hash import TypeHash, TypeHashDictionary


class TopicEndpointTypeEnum(IntEnum):
Expand Down Expand Up @@ -43,75 +44,72 @@ class TopicEndpointInfo:
'_qos_profile'
]

def __init__(self, **kwargs):
assert all('_' + key in self.__slots__ for key in kwargs.keys()), \
'Invalid arguments passed to constructor: %r' % kwargs.keys()

self.node_name = kwargs.get('node_name', '')
self.node_namespace = kwargs.get('node_namespace', '')
self.topic_type = kwargs.get('topic_type', '')
self.topic_type_hash = kwargs.get('topic_type_hash', TypeHash())
self.endpoint_type = kwargs.get('endpoint_type', TopicEndpointTypeEnum.INVALID)
self.endpoint_gid = kwargs.get('endpoint_gid', [])
self.qos_profile = kwargs.get('qos_profile', QoSPresetProfiles.UNKNOWN.value)
def __init__(self, node_name: str = '', node_namespace: str = '',
topic_type: str = '', topic_type_hash: TypeHash = TypeHash(),
endpoint_type: TopicEndpointTypeEnum = TopicEndpointTypeEnum.INVALID,
endpoint_gid: List[int] = [],
qos_profile: QoSProfile = QoSPresetProfiles.UNKNOWN.value) -> None:
self.node_name = node_name
self.node_namespace = node_namespace
self.topic_type = topic_type
self.topic_type_hash = topic_type_hash
self.endpoint_type = endpoint_type
self.endpoint_gid = endpoint_gid
self.qos_profile = qos_profile

@property
def node_name(self):
def node_name(self) -> str:
"""
Get field 'node_name'.
:returns: node_name attribute
:rtype: str
"""
return self._node_name

@node_name.setter
def node_name(self, value):
def node_name(self, value: str) -> None:
assert isinstance(value, str)
self._node_name = value

@property
def node_namespace(self):
def node_namespace(self) -> str:
"""
Get field 'node_namespace'.
:returns: node_namespace attribute
:rtype: str
"""
return self._node_namespace

@node_namespace.setter
def node_namespace(self, value):
def node_namespace(self, value: str) -> None:
assert isinstance(value, str)
self._node_namespace = value

@property
def topic_type(self):
def topic_type(self) -> str:
"""
Get field 'topic_type'.
:returns: topic_type attribute
:rtype: str
"""
return self._topic_type

@topic_type.setter
def topic_type(self, value):
def topic_type(self, value: str) -> None:
assert isinstance(value, str)
self._topic_type = value

@property
def topic_type_hash(self):
def topic_type_hash(self) -> TypeHash:
"""
Get field 'topic_type_hash'.
:returns: topic_type_hash attribute
:rtype: TypeHash
"""
return self._topic_type_hash

@topic_type_hash.setter
def topic_type_hash(self, value):
def topic_type_hash(self, value: Union[TypeHash, TypeHashDictionary]) -> None:
if isinstance(value, TypeHash):
self._topic_type_hash = value
elif isinstance(value, dict):
Expand All @@ -120,17 +118,16 @@ def topic_type_hash(self, value):
assert False

@property
def endpoint_type(self):
def endpoint_type(self) -> TopicEndpointTypeEnum:
"""
Get field 'endpoint_type'.
:returns: endpoint_type attribute
:rtype: TopicEndpointTypeEnum
"""
return self._endpoint_type

@endpoint_type.setter
def endpoint_type(self, value):
def endpoint_type(self, value: Union[TopicEndpointTypeEnum, int]) -> None:
if isinstance(value, TopicEndpointTypeEnum):
self._endpoint_type = value
elif isinstance(value, int):
Expand All @@ -139,47 +136,45 @@ def endpoint_type(self, value):
assert False

@property
def endpoint_gid(self):
def endpoint_gid(self) -> List[int]:
"""
Get field 'endpoint_gid'.
:returns: endpoint_gid attribute
:rtype: list
"""
return self._endpoint_gid

@endpoint_gid.setter
def endpoint_gid(self, value):
def endpoint_gid(self, value: List[int]) -> None:
assert all(isinstance(x, int) for x in value)
self._endpoint_gid = value

@property
def qos_profile(self):
def qos_profile(self) -> QoSProfile:
"""
Get field 'qos_profile'.
:returns: qos_profile attribute
:rtype: QoSProfile
"""
return self._qos_profile

@qos_profile.setter
def qos_profile(self, value):
def qos_profile(self, value: Union[QoSProfile, QoSProfileDictionary]) -> None:
if isinstance(value, QoSProfile):
self._qos_profile = value
elif isinstance(value, dict):
self._qos_profile = QoSProfile(**value)
else:
assert False

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, TopicEndpointInfo):
return False
return all(
self.__getattribute__(slot) == other.__getattribute__(slot)
for slot in self.__slots__)

def __str__(self):
def __str__(self) -> str:
gid = '.'.join(format(x, '02x') for x in self.endpoint_gid)
if self.qos_profile.history.value != QoSHistoryPolicy.KEEP_LAST:
history_depth_str = self.qos_profile.history.name
Expand Down
7 changes: 7 additions & 0 deletions rclpy/rclpy/type_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TypedDict


class TypeHashDictionary(TypedDict):
version: int
value: bytes


class TypeHash:
"""Type hash."""
Expand Down

0 comments on commit 35c0e7e

Please sign in to comment.