Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add types to topic_endpoint_info.py #1253

Merged
merged 3 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion rclpy/rclpy/qos.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from enum import Enum
from enum import IntEnum
from typing import Union
from typing import TypedDict, Union

import warnings

Expand Down Expand Up @@ -54,6 +54,18 @@ def __init__(self, *args):
Exception.__init__(self, 'Invalid QoSProfile', *args)


class QoSProfileDictionary(TypedDict):
history: 'QoSHistoryPolicy'
depth: int
reliability: 'QoSReliabilityPolicy'
durability: 'QoSDurabilityPolicy'
lifespan: Duration
deadline: Duration
liveliness: 'QoSLivelinessPolicy'
liveliness_lease_duration: Duration
avoid_ros_namespace_conventions: bool


class QoSProfile:
"""Define Quality of Service policies."""

Expand Down
67 changes: 31 additions & 36 deletions rclpy/rclpy/topic_endpoint_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

from enum import IntEnum
from typing import List, Union

from rclpy.qos import QoSHistoryPolicy, QoSPresetProfiles, QoSProfile
from rclpy.type_hash import TypeHash
from rclpy.qos import QoSHistoryPolicy, QoSPresetProfiles, QoSProfile, QoSProfileDictionary
from rclpy.type_hash import TypeHash, TypeHashDictionary


class TopicEndpointTypeEnum(IntEnum):
Expand Down Expand Up @@ -43,75 +44,72 @@ class TopicEndpointInfo:
'_qos_profile'
]

def __init__(self, **kwargs):
assert all('_' + key in self.__slots__ for key in kwargs.keys()), \
'Invalid arguments passed to constructor: %r' % kwargs.keys()

self.node_name = kwargs.get('node_name', '')
self.node_namespace = kwargs.get('node_namespace', '')
self.topic_type = kwargs.get('topic_type', '')
self.topic_type_hash = kwargs.get('topic_type_hash', TypeHash())
self.endpoint_type = kwargs.get('endpoint_type', TopicEndpointTypeEnum.INVALID)
self.endpoint_gid = kwargs.get('endpoint_gid', [])
self.qos_profile = kwargs.get('qos_profile', QoSPresetProfiles.UNKNOWN.value)
def __init__(self, node_name: str = '', node_namespace: str = '',
topic_type: str = '', topic_type_hash: TypeHash = TypeHash(),
endpoint_type: TopicEndpointTypeEnum = TopicEndpointTypeEnum.INVALID,
endpoint_gid: List[int] = [],
qos_profile: QoSProfile = QoSPresetProfiles.UNKNOWN.value) -> None:
self.node_name = node_name
self.node_namespace = node_namespace
self.topic_type = topic_type
self.topic_type_hash = topic_type_hash
self.endpoint_type = endpoint_type
self.endpoint_gid = endpoint_gid
self.qos_profile = qos_profile

@property
def node_name(self):
def node_name(self) -> str:
"""
Get field 'node_name'.

:returns: node_name attribute
:rtype: str
"""
return self._node_name

@node_name.setter
def node_name(self, value):
def node_name(self, value: str) -> None:
assert isinstance(value, str)
self._node_name = value

@property
def node_namespace(self):
def node_namespace(self) -> str:
"""
Get field 'node_namespace'.

:returns: node_namespace attribute
:rtype: str
"""
return self._node_namespace

@node_namespace.setter
def node_namespace(self, value):
def node_namespace(self, value: str) -> None:
assert isinstance(value, str)
self._node_namespace = value

@property
def topic_type(self):
def topic_type(self) -> str:
"""
Get field 'topic_type'.

:returns: topic_type attribute
:rtype: str
"""
return self._topic_type

@topic_type.setter
def topic_type(self, value):
def topic_type(self, value: str) -> None:
assert isinstance(value, str)
self._topic_type = value

@property
def topic_type_hash(self):
def topic_type_hash(self) -> TypeHash:
"""
Get field 'topic_type_hash'.

:returns: topic_type_hash attribute
:rtype: TypeHash
"""
return self._topic_type_hash

@topic_type_hash.setter
def topic_type_hash(self, value):
def topic_type_hash(self, value: Union[TypeHash, TypeHashDictionary]) -> None:
if isinstance(value, TypeHash):
self._topic_type_hash = value
elif isinstance(value, dict):
Expand All @@ -120,17 +118,16 @@ def topic_type_hash(self, value):
assert False

@property
def endpoint_type(self):
def endpoint_type(self) -> TopicEndpointTypeEnum:
"""
Get field 'endpoint_type'.

:returns: endpoint_type attribute
:rtype: TopicEndpointTypeEnum
"""
return self._endpoint_type

@endpoint_type.setter
def endpoint_type(self, value):
def endpoint_type(self, value: Union[TopicEndpointTypeEnum, int]) -> None:
if isinstance(value, TopicEndpointTypeEnum):
self._endpoint_type = value
elif isinstance(value, int):
Expand All @@ -139,47 +136,45 @@ def endpoint_type(self, value):
assert False

@property
def endpoint_gid(self):
def endpoint_gid(self) -> List[int]:
"""
Get field 'endpoint_gid'.

:returns: endpoint_gid attribute
:rtype: list
"""
return self._endpoint_gid

@endpoint_gid.setter
def endpoint_gid(self, value):
def endpoint_gid(self, value: List[int]) -> None:
assert all(isinstance(x, int) for x in value)
self._endpoint_gid = value

@property
def qos_profile(self):
def qos_profile(self) -> QoSProfile:
"""
Get field 'qos_profile'.

:returns: qos_profile attribute
:rtype: QoSProfile
"""
return self._qos_profile

@qos_profile.setter
def qos_profile(self, value):
def qos_profile(self, value: Union[QoSProfile, QoSProfileDictionary]) -> None:
if isinstance(value, QoSProfile):
self._qos_profile = value
elif isinstance(value, dict):
self._qos_profile = QoSProfile(**value)
else:
assert False

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, TopicEndpointInfo):
return False
return all(
self.__getattribute__(slot) == other.__getattribute__(slot)
for slot in self.__slots__)

def __str__(self):
def __str__(self) -> str:
gid = '.'.join(format(x, '02x') for x in self.endpoint_gid)
if self.qos_profile.history.value != QoSHistoryPolicy.KEEP_LAST:
history_depth_str = self.qos_profile.history.name
Expand Down
7 changes: 7 additions & 0 deletions rclpy/rclpy/type_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TypedDict


class TypeHashDictionary(TypedDict):
version: int
value: bytes


class TypeHash:
"""Type hash."""
Expand Down