diff --git a/launch/launch/event_handler.py b/launch/launch/event_handler.py index 078da25f5..82224e36c 100644 --- a/launch/launch/event_handler.py +++ b/launch/launch/event_handler.py @@ -15,7 +15,10 @@ """Module for EventHandler class.""" from typing import Callable +from typing import List from typing import Optional +from typing import Text +from typing import Tuple from .event import Event from .some_actions_type import SomeActionsType @@ -39,7 +42,8 @@ def __init__( self, *, matcher: Callable[[Event], bool], - entities: Optional[SomeActionsType] = None + entities: Optional[SomeActionsType] = None, + handle_once: bool = False ) -> None: """ Constructor. @@ -48,16 +52,52 @@ def __init__( the event should be handled by this event handler, False otherwise. :param: entities is an LaunchDescriptionEntity or list of them, and is returned by handle() unconditionally if matcher returns True. + :param: handle_once is a flag that, if True, unregisters this EventHandler + after being handled once. """ self.__matcher = matcher self.__entities = entities + self.__handle_once = handle_once @property def entities(self): """Getter for entities.""" return self.__entities - # TODO(wjwwood): setup standard interface for describing event handlers + @property + def handle_once(self): + """Getter for handle_once flag.""" + return self.__handle_once + + @property + def handler_description(self): + """ + Return the string description of the handler. + + This should be overridden. + """ + return None + + @property + def matcher_description(self): + """ + Return the string description of the matcher. + + This should be overridden. + """ + return None + + def describe(self) -> Tuple[Text, List[SomeActionsType]]: + """Return the description list with 0 as a string, and then LaunchDescriptionEntity's.""" + return ( + "{}(matcher='{}', handler='{}', handle_once={})".format( + type(self).__name__, + self.matcher_description, + self.handler_description, + self.handle_once + ), + self.entities if self.entities is not None else [] + ) def matches(self, event: Event) -> bool: """Return True if the given event should be handled by this event handler.""" @@ -66,4 +106,6 @@ def matches(self, event: Event) -> bool: def handle(self, event: Event, context: 'LaunchContext') -> Optional[SomeActionsType]: """Handle the given event.""" context.extend_locals({'event': event}) + if self.handle_once: + context.unregister_event_handler(self) return self.__entities diff --git a/launch/launch/event_handlers/on_include_launch_description.py b/launch/launch/event_handlers/on_include_launch_description.py index a06842f9c..38d0137d1 100644 --- a/launch/launch/event_handlers/on_include_launch_description.py +++ b/launch/launch/event_handlers/on_include_launch_description.py @@ -14,20 +14,17 @@ """Module for OnIncludeLaunchDescription class.""" -from typing import List from typing import Text -from typing import Tuple from ..event_handler import EventHandler from ..events import IncludeLaunchDescription -from ..launch_description_entity import LaunchDescriptionEntity from ..utilities import is_a_subclass class OnIncludeLaunchDescription(EventHandler): """Event handler used to handle asynchronous requests to include LaunchDescriptions.""" - def __init__(self): + def __init__(self, **kwargs): """Constructor.""" from ..actions import OpaqueFunction super().__init__( @@ -35,14 +32,15 @@ def __init__(self): entities=OpaqueFunction( function=lambda context: [context.locals.event.launch_description] ), + **kwargs, ) - def describe(self) -> Tuple[Text, List[LaunchDescriptionEntity]]: - """Return the description list with 0 as a string, and then LaunchDescriptionEntity's.""" - return ( - "OnIncludeLaunchDescription(matcher='{}', handler='{}')".format( - 'event issubclass of launch.events.IncludeLaunchDescription', - 'returns the launch_description in the event' - ), - [], - ) + @property + def handler_description(self) -> Text: + """Return the string description of the handler.""" + return 'returns the launch_description in the event' + + @property + def matcher_description(self) -> Text: + """Return the string description of the matcher.""" + return 'event issubclass of launch.events.IncludeLaunchDescription' diff --git a/launch/launch/event_handlers/on_process_exit.py b/launch/launch/event_handlers/on_process_exit.py index 994ff496e..100353b34 100644 --- a/launch/launch/event_handlers/on_process_exit.py +++ b/launch/launch/event_handlers/on_process_exit.py @@ -17,11 +17,9 @@ import collections from typing import Callable from typing import cast -from typing import List from typing import Optional from typing import overload from typing import Text -from typing import Tuple from ..event import Event from ..event_handler import EventHandler @@ -48,7 +46,8 @@ class OnProcessExit(EventHandler): def __init__( self, *, target_action: 'ExecuteProcess' = None, - on_exit: SomeActionsType + on_exit: SomeActionsType, + **kwargs ) -> None: """Overload which takes just actions.""" ... @@ -58,12 +57,13 @@ def __init__( self, *, target_action: 'ExecuteProcess' = None, - on_exit: Callable[[int], Optional[SomeActionsType]] + on_exit: Callable[[int], Optional[SomeActionsType]], + **kwargs ) -> None: """Overload which takes a callable to handle the exit.""" ... - def __init__(self, *, target_action=None, on_exit) -> None: # noqa: F811 + def __init__(self, *, target_action=None, on_exit, **kwargs) -> None: # noqa: F811 """Constructor.""" from ..actions import ExecuteProcess # noqa if not isinstance(target_action, (ExecuteProcess, type(None))): @@ -78,6 +78,7 @@ def __init__(self, *, target_action=None, on_exit) -> None: # noqa: F811 ) ), entities=None, + **kwargs, ) self.__target_action = target_action # TODO(wjwwood) check that it is not only callable, but also a callable that matches @@ -107,24 +108,18 @@ def handle(self, event: Event, context: LaunchContext) -> Optional[SomeActionsTy """Handle the given event.""" return self.__on_exit(cast(ProcessExited, event), context) - def describe(self) -> Tuple[Text, List[LaunchDescriptionEntity]]: - """Return the description list with 0 as a string, and then LaunchDescriptionEntity's.""" + @property + def handler_description(self) -> Text: + """Return the string description of the handler.""" + # TODO(jacobperron): revisit how to describe known actions that are passed in. + # It would be nice if the parent class could output their description + # via the 'entities' property. if self.__actions_on_exit: - # A list of resulting actions is already known. - return ( - "OnProcessExit(matcher='{}', handler=)".format(self.matcher_description), - self.__actions_on_exit, - ) - # A callable handler has been provided. - return ( - "OnProcessExit(matcher='{}', handler={})".format( - self.matcher_description, - self.__on_exit), - [], - ) + return '' + return '{}'.format(self.__on_exit) @property - def matcher_description(self): + def matcher_description(self) -> Text: """Return the string description of the matcher.""" if self.__target_action is None: return 'event == ProcessExited' diff --git a/launch/launch/event_handlers/on_process_io.py b/launch/launch/event_handlers/on_process_io.py index e005f27c6..f2e6b9fde 100644 --- a/launch/launch/event_handlers/on_process_io.py +++ b/launch/launch/event_handlers/on_process_io.py @@ -16,16 +16,13 @@ from typing import Callable from typing import cast -from typing import List from typing import Optional from typing import Text -from typing import Tuple from ..event import Event from ..event_handler import EventHandler from ..events.process import ProcessIO from ..launch_context import LaunchContext -from ..launch_description_entity import LaunchDescriptionEntity from ..some_actions_type import SomeActionsType if False: @@ -44,13 +41,14 @@ def __init__( target_action: Optional['ExecuteProcess'] = None, on_stdin: Callable[[ProcessIO], Optional[SomeActionsType]] = None, on_stdout: Callable[[ProcessIO], Optional[SomeActionsType]] = None, - on_stderr: Callable[[ProcessIO], Optional[SomeActionsType]] = None + on_stderr: Callable[[ProcessIO], Optional[SomeActionsType]] = None, + **kwargs ) -> None: """Constructor.""" from ..actions import ExecuteProcess # noqa if not isinstance(target_action, (ExecuteProcess, type(None))): raise RuntimeError("OnProcessIO requires an 'ExecuteProcess' action as the target") - super().__init__(matcher=self._matcher, entities=None) + super().__init__(matcher=self._matcher, entities=None, **kwargs) self.__target_action = target_action self.__on_stdin = on_stdin self.__on_stdout = on_stdout @@ -77,8 +75,9 @@ def handle(self, event: Event, context: LaunchContext) -> Optional[SomeActionsTy return self.__on_stdin(event) return None - def describe(self) -> Tuple[Text, List[LaunchDescriptionEntity]]: - """Return the description list with 0 as a string, and then LaunchDescriptionEntity's.""" + @property + def handler_description(self) -> Text: + """Return the string description of the handler.""" handlers = [] if self.__on_stdin is not None: handlers.append("on_stdin: '{}'".format(self.__on_stdin)) @@ -87,15 +86,10 @@ def describe(self) -> Tuple[Text, List[LaunchDescriptionEntity]]: if self.__on_stderr is not None: handlers.append("on_stderr: '{}'".format(self.__on_stderr)) handlers_str = '{' + ', '.join(handlers) + '}' - return ( - "OnProcessIO(matcher='{}', handlers={})".format( - self.matcher_description, handlers_str - ), - [], - ) + return handlers_str @property - def matcher_description(self): + def matcher_description(self) -> Text: """Return the string description of the matcher.""" if self.__target_action is None: return 'event issubclass of ProcessIO' diff --git a/launch/launch/event_handlers/on_shutdown.py b/launch/launch/event_handlers/on_shutdown.py index 361102ff7..127172865 100644 --- a/launch/launch/event_handlers/on_shutdown.py +++ b/launch/launch/event_handlers/on_shutdown.py @@ -16,16 +16,13 @@ from typing import Callable from typing import cast -from typing import List from typing import Optional from typing import overload from typing import Text -from typing import Tuple from ..event import Event from ..event_handler import EventHandler from ..events import Shutdown -from ..launch_description_entity import LaunchDescriptionEntity from ..some_actions_type import SomeActionsType from ..utilities import is_a_subclass @@ -38,7 +35,7 @@ class OnShutdown(EventHandler): """Convenience class for handling the launch shutdown event.""" @overload - def __init__(self, *, on_shutdown: SomeActionsType) -> None: + def __init__(self, *, on_shutdown: SomeActionsType, **kwargs) -> None: """Overload which takes just actions.""" ... @@ -46,16 +43,18 @@ def __init__(self, *, on_shutdown: SomeActionsType) -> None: def __init__( self, *, - on_shutdown: Callable[[Shutdown, 'LaunchContext'], Optional[SomeActionsType]] + on_shutdown: Callable[[Shutdown, 'LaunchContext'], Optional[SomeActionsType]], + **kwargs ) -> None: """Overload which takes a callable to handle the shutdown.""" ... - def __init__(self, *, on_shutdown): # noqa: F811 + def __init__(self, *, on_shutdown, **kwargs): # noqa: F811 """Constructor.""" super().__init__( matcher=lambda event: is_a_subclass(event, Shutdown), entities=None, # noop + **kwargs, ) # TODO(wjwwood) check that it is not only callable, but also a callable that matches # the correct signature for a handler in this case @@ -68,15 +67,11 @@ def handle(self, event: Event, context: 'LaunchContext') -> Optional[SomeActions context.extend_locals({'event': event}) return self.__on_shutdown(cast(Shutdown, event), context) - def describe(self) -> Tuple[Text, List[LaunchDescriptionEntity]]: - """Return the description list with 0 as a string, and then LaunchDescriptionEntity's.""" + @property + def handler_description(self) -> Text: + """Return the string description of the handler.""" # TODO(dhood): print known actions if they were passed in, like in OnProcessExit - return ( - "OnShutdown(matcher='{}', handler={})".format( - self.matcher_description, - self.__on_shutdown), - [], - ) + return '{}'.format(self.__on_shutdown) @property def matcher_description(self): diff --git a/launch/test/launch/test_event_handler.py b/launch/test/launch/test_event_handler.py index 5a6285a8f..3cce1c115 100644 --- a/launch/test/launch/test_event_handler.py +++ b/launch/test/launch/test_event_handler.py @@ -19,11 +19,15 @@ from launch import LaunchDescriptionEntity from launch import SomeActionsType_types_tuple +import pytest + def test_event_handler_constructors(): """Test the constructors for EventHandler class.""" EventHandler(matcher=lambda event: False) EventHandler(matcher=lambda event: False, entities=[LaunchDescriptionEntity]) + EventHandler(matcher=lambda event: True, handle_once=True) + EventHandler(matcher=lambda event: True, entities=None, handle_once=False) def test_event_handler_matches_and_handle(): @@ -39,3 +43,30 @@ class MockEvent: assert isinstance(entities, SomeActionsType_types_tuple) assert len(entities) == 1 assert context.locals.event == mock_event + + +def test_event_handler_handle_once(): + """Test the option for handling events once for the EventHandler class.""" + class MockEvent: + ... + + mock_event = MockEvent() + + # Test handling multiple events with handle_once=False (default) + eh_multiple = EventHandler(matcher=lambda event: True, handle_once=False) + context_multiple = LaunchContext() + context_multiple.register_event_handler(eh_multiple) + eh_multiple.handle(mock_event, context_multiple) + assert context_multiple.locals.event == mock_event + # Attempt to handle a second event + eh_multiple.handle(mock_event, context_multiple) + + # Test handling multiple events with handle_once=True + eh_once = EventHandler(matcher=lambda event: True, handle_once=True) + context_once = LaunchContext() + context_once.register_event_handler(eh_once) + eh_once.handle(mock_event, context_once) + assert context_once.locals.event == mock_event + # Attempt to handle a second event, this time expect ValueError because it is unregistered + with pytest.raises(ValueError): + eh_once.handle(mock_event, context_once) diff --git a/launch_ros/launch_ros/event_handlers/on_state_transition.py b/launch_ros/launch_ros/event_handlers/on_state_transition.py index 07cbd786b..c0f89e5f5 100644 --- a/launch_ros/launch_ros/event_handlers/on_state_transition.py +++ b/launch_ros/launch_ros/event_handlers/on_state_transition.py @@ -15,14 +15,11 @@ """Module for OnStateTransition class.""" from typing import Callable -from typing import List from typing import Optional from typing import Text -from typing import Tuple from launch.event import Event from launch.event_handler import EventHandler -from launch.launch_description_entity import LaunchDescriptionEntity from launch.some_actions_type import SomeActionsType from launch.some_substitutions_type import SomeSubstitutionsType @@ -41,7 +38,8 @@ def __init__( transition: Optional[SomeSubstitutionsType] = None, start_state: Optional[SomeSubstitutionsType] = None, goal_state: Optional[SomeSubstitutionsType] = None, - matcher: Optional[Callable[[Event], bool]] = None + matcher: Optional[Callable[[Event], bool]] = None, + **kwargs ) -> None: """ Constructor. @@ -79,15 +77,14 @@ def __init__( super().__init__( matcher=self.__custom_matcher, entities=entities, + **kwargs ) self.__target_lifecycle_node = target_lifecycle_node - def describe(self) -> Tuple[Text, List[LaunchDescriptionEntity]]: - """Return the description list with 0 as a string, and then LaunchDescriptionEntity's.""" - return ( - "OnStateTransition(matcher='{}', handler=)".format(self.matcher_description), - self.entities, - ) + @property + def handler_description(self) -> Text: + """Return the string description of the handler.""" + return '' @property def matcher_description(self):