From 4b89dafa11beda09071fcf1e0b468f7883d1ba11 Mon Sep 17 00:00:00 2001 From: Hubert Liberacki Date: Wed, 30 Mar 2022 12:48:59 +0200 Subject: [PATCH 1/3] Replace spin_until_future_complete with spin_until_complete, add spin_for method * Deprecate spin_until_future_complete * Add spin_until_complete * Add spin_for method * Udpdate unit tests Signed-off-by: Hubert Liberacki --- rclpy/rclpy/__init__.py | 57 ++++++++++++++--- rclpy/rclpy/executors.py | 70 ++++++++++++++++++--- rclpy/test/test_action_client.py | 26 ++++---- rclpy/test/test_action_server.py | 54 ++++++++-------- rclpy/test/test_client.py | 6 +- rclpy/test/test_executor.py | 104 +++++++++++++++++++++++++++---- rclpy/test/test_qos_event.py | 2 +- rclpy/test/test_waitable.py | 2 +- 8 files changed, 246 insertions(+), 75 deletions(-) diff --git a/rclpy/rclpy/__init__.py b/rclpy/rclpy/__init__.py index cea1f9163..1a30d4b3b 100644 --- a/rclpy/rclpy/__init__.py +++ b/rclpy/rclpy/__init__.py @@ -33,7 +33,7 @@ After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on the node. The following functions can be used to process work that is waiting to be executed: :func:`spin`, -:func:`spin_once`, and :func:`spin_until_future_complete`. +:func:`spin_once`, and :func:`spin_until_complete`. When finished with a previously initialized :class:`.Context` (ie. done using all ROS nodes associated with the context), the :func:`shutdown` function should be called. @@ -224,6 +224,52 @@ def spin(node: 'Node', executor: 'Executor' = None) -> None: executor.remove_node(node) +def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None: + """ + Execute block until the context associated with the executor is shutdown or time duration pass. + + Callbacks will be executed by the provided executor. + + This function blocks. + + :param node: A node to add to the executor to check for work. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_once(duration_sec) + finally: + executor.remove_node(node) + + +def spin_until_complete( + node: 'Node', + condition, + executor: 'Executor' = None, + timeout_sec: float = None +) -> None: + """ + Execute work until the condition is complete. + + Callbacks and other work will be executed by the provided executor until ``condition()`` or + ``future.done()`` returns ``True`` or the context associated with the executor is shutdown. + + :param node: A node to add to the executor to check for work. + :param condition: The callable or future object to wait on. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. Block until the condition is complete + if ``None`` or negative. Don't wait if 0. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_until_complete(condition, timeout_sec) + finally: + executor.remove_node(node) + + def spin_until_future_complete( node: 'Node', future: Future, @@ -241,10 +287,7 @@ def spin_until_future_complete( :param executor: The executor to use, or the global executor if ``None``. :param timeout_sec: Seconds to wait. Block until the future is complete if ``None`` or negative. Don't wait if 0. + + Deprecated in favor of spin_until_complete """ - executor = get_global_executor() if executor is None else executor - try: - executor.add_node(node) - executor.spin_until_future_complete(future, timeout_sec) - finally: - executor.remove_node(node) + spin_until_complete(node, future, executor, timeout_sec) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 3c5bea476..0ac57f2d9 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -278,21 +278,37 @@ def spin(self) -> None: while self._context.ok() and not self._is_shutdown: self.spin_once() - def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: - """Execute callbacks until a given future is done or a timeout occurs.""" - # Make sure the future wakes this executor when it is done - future.add_done_callback(lambda x: self.wake()) + def spin_for(self, duration_sec: float = None) -> None: + """Execute callbacks until shutdown, or timeout.""" + self.spin_until_complete(lambda: False, duration_sec) + + def spin_until_complete(self, condition, timeout_sec: float = None) -> None: + """ + Execute callbacks until a given condition is done or a timeout occurs. + + Deprecated in favor of spin_until_complete. + """ + # Common conditon for safisfying both Callable and Future + finish_condition = None + if (isinstance(condition, Future)): + # Make sure the future wakes this executor when it is done + condition.add_done_callback(lambda x: self.wake()) + def finish_condition(): return condition.done() + elif (callable(condition)): + def finish_condition(): return condition() + else: + raise TypeError('Condition has to be of Future or Callable type') if timeout_sec is None or timeout_sec < 0: - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_sec) + while self._context.ok() and not finish_condition() and not self._is_shutdown: + self.spin_once_until_complete(condition, timeout_sec) else: start = time.monotonic() end = start + timeout_sec timeout_left = timeout_sec - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_left) + while self._context.ok() and not finish_condition() and not self._is_shutdown: + self.spin_once_until_complete(condition, timeout_left) now = time.monotonic() if now >= end: @@ -300,6 +316,14 @@ def spin_until_future_complete(self, future: Future, timeout_sec: float = None) timeout_left = end - now + def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + """ + Execute callbacks until a given future is done or a timeout occurs. + + Deprecated in favor of spin_until_complete. + """ + self.spin_until_complete(future, timeout_sec) + def spin_once(self, timeout_sec: float = None) -> None: """ Wait for and execute a single callback. @@ -311,6 +335,19 @@ def spin_once(self, timeout_sec: float = None) -> None: """ raise NotImplementedError() + def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: + """ + Wait for and execute a single callback. + + This should behave in the same way as :meth:`spin_once`. + If needed by the implementation, it should awake other threads waiting. + + :param condition: The executor will wait until this condition is done. + :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. + Don't wait if 0. + """ + raise NotImplementedError() + def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: """ Wait for and execute a single callback. @@ -321,6 +358,8 @@ def spin_once_until_future_complete(self, future: Future, timeout_sec: float = N :param future: The executor will wait until this future is done. :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. Don't wait if 0. + + Deprecated in favor of spin_once_until_complete. """ raise NotImplementedError() @@ -722,9 +761,14 @@ def spin_once(self, timeout_sec: float = None) -> None: if handler.exception() is not None: raise handler.exception() - def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: self.spin_once(timeout_sec) + """Deprecated in favor of spin_once_until_complete""" + + def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + self.spin_once_until_complete(timeout_sec) + class MultiThreadedExecutor(Executor): """ @@ -767,5 +811,11 @@ def _spin_once_impl( def spin_once(self, timeout_sec: float = None) -> None: self._spin_once_impl(timeout_sec) + def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: + self._spin_once_impl(timeout_sec, condition if callable( + condition) else condition.done) + + """Deprecated in favor of spin_once_until_complete""" + def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: - self._spin_once_impl(timeout_sec, future.done) + self.spin_once_until_complete(timeout_sec, future.done) diff --git a/rclpy/test/test_action_client.py b/rclpy/test/test_action_client.py index 6dfe71b0d..59fcad91e 100644 --- a/rclpy/test/test_action_client.py +++ b/rclpy/test/test_action_client.py @@ -159,7 +159,7 @@ def test_send_goal_async(self): try: self.assertTrue(ac.wait_for_server(timeout_sec=2.0)) future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertTrue(future.done()) goal_handle = future.result() self.assertTrue(goal_handle.accepted) @@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback after goal has been accepted self.mock_action_server.publish_feedback(goal_uuid) @@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Check the feedback was not received self.assertEqual(self.feedback, None) @@ -246,14 +246,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=first_goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Send another goal, but without a feedback callback second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes)) future = ac.send_goal_async( Fibonacci.Goal(), goal_uuid=second_goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback for the second goal self.mock_action_server.publish_feedback(second_goal_uuid) @@ -277,7 +277,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback for a non-existent goal ID self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes))) @@ -298,9 +298,9 @@ def test_send_goal_multiple(self): future_0 = ac.send_goal_async(Fibonacci.Goal()) future_1 = ac.send_goal_async(Fibonacci.Goal()) future_2 = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, future_0, executor) - rclpy.spin_until_future_complete(self.node, future_1, executor) - rclpy.spin_until_future_complete(self.node, future_2, executor) + rclpy.spin_until_complete(self.node, future_0, executor) + rclpy.spin_until_complete(self.node, future_1, executor) + rclpy.spin_until_complete(self.node, future_2, executor) self.assertTrue(future_0.done()) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) @@ -326,13 +326,13 @@ def test_send_cancel_async(self): # Send a goal goal_future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) goal_handle = goal_future.result() # Cancel the goal cancel_future = goal_handle.cancel_goal_async() - rclpy.spin_until_future_complete(self.node, cancel_future, self.executor) + rclpy.spin_until_complete(self.node, cancel_future, self.executor) self.assertTrue(cancel_future.done()) self.assertEqual( cancel_future.result().goals_canceling[0].goal_id, @@ -347,13 +347,13 @@ def test_get_result_async(self): # Send a goal goal_future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) goal_handle = goal_future.result() # Get the goal result result_future = goal_handle.get_result_async() - rclpy.spin_until_future_complete(self.node, result_future, self.executor) + rclpy.spin_until_complete(self.node, result_future, self.executor) self.assertTrue(result_future.done()) finally: ac.destroy() diff --git a/rclpy/test/test_action_server.py b/rclpy/test/test_action_server.py index fa2af1e74..bfb1b6874 100644 --- a/rclpy/test/test_action_server.py +++ b/rclpy/test/test_action_server.py @@ -169,7 +169,7 @@ def handle_accepted_callback(goal_handle): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertTrue(future.result().accepted) self.assertTrue(handle_accepted_callback_triggered) action_server.destroy() @@ -199,7 +199,7 @@ def handle_accepted_callback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_msg.goal.order = goal_order future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertFalse(future.result().accepted) action_server.destroy() @@ -220,7 +220,7 @@ def goal_callback(goal): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # An invalid return type in the goal callback should translate to a rejected goal self.assertFalse(future.result().accepted) action_server.destroy() @@ -244,9 +244,9 @@ def test_multi_goal_accept(self): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) future2 = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future0, executor) - rclpy.spin_until_future_complete(self.node, future1, executor) - rclpy.spin_until_future_complete(self.node, future2, executor) + rclpy.spin_until_complete(self.node, future0, executor) + rclpy.spin_until_complete(self.node, future1, executor) + rclpy.spin_until_complete(self.node, future2, executor) self.assertTrue(future0.result().accepted) self.assertTrue(future1.result().accepted) @@ -270,8 +270,8 @@ def test_duplicate_goal(self): future0 = self.mock_action_client.send_goal(goal_msg) future1 = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future0, executor) - rclpy.spin_until_future_complete(self.node, future1, executor) + rclpy.spin_until_complete(self.node, future0, executor) + rclpy.spin_until_complete(self.node, future1, executor) # Exactly one of the goals should be accepted self.assertNotEqual(future0.result().accepted, future1.result().accepted) @@ -306,7 +306,7 @@ def cancel_callback(request): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, executor) + rclpy.spin_until_complete(self.node, goal_future, executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) @@ -315,7 +315,7 @@ def cancel_callback(request): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, executor) + rclpy.spin_until_complete(self.node, cancel_future, executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 1) assert all(cancel_result.goals_canceling[0].goal_id.uuid == goal_uuid.uuid) @@ -352,7 +352,7 @@ def cancel_callback(request): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, executor) + rclpy.spin_until_complete(self.node, goal_future, executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) @@ -361,7 +361,7 @@ def cancel_callback(request): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, executor) + rclpy.spin_until_complete(self.node, cancel_future, executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 0) @@ -398,7 +398,7 @@ def execute_callback(gh): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) send_goal_response = goal_future.result() self.assertTrue(send_goal_response.accepted) self.assertIsNotNone(server_goal_handle) @@ -410,7 +410,7 @@ def execute_callback(gh): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, self.executor) + rclpy.spin_until_complete(self.node, cancel_future, self.executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 1) @@ -421,7 +421,7 @@ def execute_callback(gh): # Get the result and exepect it to have canceled status get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result = get_result_future.result() self.assertEqual(result.status, GoalStatus.STATUS_CANCELED) self.assertEqual(server_goal_handle.status, GoalStatus.STATUS_CANCELED) @@ -447,12 +447,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() self.assertEqual(result_response.status, GoalStatus.STATUS_SUCCEEDED) self.assertEqual(result_response.result.sequence.tolist(), [1, 1, 2, 3, 5]) @@ -478,12 +478,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) self.assertEqual(result_response.result.sequence.tolist(), [1, 1, 2, 3, 5]) @@ -508,12 +508,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() # Goal status should default to STATUS_ABORTED self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) @@ -537,12 +537,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() # Goal status should default to STATUS_ABORTED self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) @@ -563,7 +563,7 @@ def test_expire_goals_none(self): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertEqual(1, len(action_server._goal_handles)) @@ -586,7 +586,7 @@ def test_expire_goals_one(self): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertEqual(1, len(action_server._goal_handles)) @@ -642,7 +642,7 @@ def execute_with_feedback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertIsNotNone(self.mock_action_client.feedback_msg) self.assertEqual( @@ -674,7 +674,7 @@ def execute_with_feedback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete( + rclpy.spin_until_complete( self.node, goal_future, self.executor) feedback_msg = self.mock_action_client.feedback_msg diff --git a/rclpy/test/test_client.py b/rclpy/test/test_client.py index 6903fc78b..f4d7acd61 100644 --- a/rclpy/test/test_client.py +++ b/rclpy/test/test_client.py @@ -83,8 +83,8 @@ def test_concurrent_calls_to_service(self): future1 = cli.call_async(GetParameters.Request()) future2 = cli.call_async(GetParameters.Request()) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) - rclpy.spin_until_future_complete(self.node, future1, executor=executor) - rclpy.spin_until_future_complete(self.node, future2, executor=executor) + rclpy.spin_until_complete(self.node, future1, executor=executor) + rclpy.spin_until_complete(self.node, future2, executor=executor) self.assertTrue(future1.result() is not None) self.assertTrue(future2.result() is not None) finally: @@ -131,7 +131,7 @@ def test_different_type_raises(self): future = cli.call_async(GetParameters.Request()) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) with self.assertRaises(TypeError): - rclpy.spin_until_future_complete(self.node, future, executor=executor) + rclpy.spin_until_complete(self.node, future, executor=executor) finally: self.node.destroy_client(cli) self.node.destroy_service(srv) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index c0c3cc810..96e84ae22 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -348,36 +348,104 @@ def test_executor_add_node(self): assert not executor.add_node(self.node) assert id(executor) == id(self.node.executor) - def test_executor_spin_until_future_complete_timeout(self): + def test_executor_spin_for(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) def timer_callback(): pass - timer = self.node.create_timer(0.003, timer_callback) + timer = self.node.create_timer(0.1, timer_callback) + + start = time.monotonic() + executor.spin_for(duration_sec=1.0) + end = time.monotonic() + self.assertGreaterEqual(end - start, 1.0) + + timer.cancel() + + def test_executor_spin_until_complete_timeout(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.1, timer_callback) # Timeout future = Future() self.assertFalse(future.done()) start = time.monotonic() - executor.spin_until_future_complete(future=future, timeout_sec=0.1) + executor.spin_until_complete(future, timeout_sec=1.0) end = time.monotonic() # Nothing is ever setting the future, so this should have waited - # at least 0.1 seconds. - self.assertGreaterEqual(end - start, 0.1) + # at least 1.0 seconds. + self.assertGreaterEqual(end - start, 1.0) + self.assertFalse(future.done()) + + start = time.monotonic() + executor.spin_until_complete(lambda: False, timeout_sec=1.0) + end = time.monotonic() + # condition is always false so this should have waited + # at least 1.0 seconds. + self.assertGreaterEqual(end - start, 1.0) self.assertFalse(future.done()) timer.cancel() - def test_executor_spin_until_future_complete_future_done(self): + def test_executor_spin_until_complete_condition_done(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.1, timer_callback) + + condition_var = False + + def set_condition(): + nonlocal condition_var + condition_var = True + + def condition(): + nonlocal condition_var + return condition_var + + # Condition complete timeout_sec > 0 + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=1.0) + self.assertTrue(condition()) + + # timeout_sec = None + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=None) + self.assertTrue(condition()) + + # Condition complete timeout < 0 + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=-1) + self.assertTrue(condition()) + + timer.cancel() + + def test_executor_spin_until_complete_future_done(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) def timer_callback(): pass - timer = self.node.create_timer(0.003, timer_callback) + timer = self.node.create_timer(0.1, timer_callback) def set_future_result(future): future.set_result('finished') @@ -387,7 +455,7 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=0.2) + executor.spin_until_complete(future, timeout_sec=1.0) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') @@ -396,7 +464,7 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=None) + executor.spin_until_complete(future, timeout_sec=None) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') @@ -405,27 +473,37 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=-1) + executor.spin_until_complete(future, timeout_sec=-1) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') timer.cancel() - def test_executor_spin_until_future_complete_do_not_wait(self): + def test_executor_spin_until_complete_do_not_wait(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) def timer_callback(): pass - timer = self.node.create_timer(0.003, timer_callback) + timer = self.node.create_timer(0.1, timer_callback) # Do not wait timeout_sec = 0 future = Future() self.assertFalse(future.done()) - executor.spin_until_future_complete(future=future, timeout_sec=0) + executor.spin_until_complete(future, timeout_sec=0) self.assertFalse(future.done()) + condition_var = False + + def condition(): + nonlocal condition_var + return condition_var + + self.assertFalse(condition()) + executor.spin_until_complete(condition, timeout_sec=0) + self.assertFalse(condition()) + timer.cancel() def test_executor_add_node_wakes_executor(self): diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py index 4acfc1b90..39ecc12e8 100644 --- a/rclpy/test/test_qos_event.py +++ b/rclpy/test/test_qos_event.py @@ -175,7 +175,7 @@ def warn(self, message, once=False): EmptyMsg, self.topic_name, message_callback, qos_profile_subscription) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) - rclpy.spin_until_future_complete(self.node, log_msgs_future, executor, 10.0) + rclpy.spin_until_complete(self.node, log_msgs_future, executor, 10.0) self.assertEqual( pub_log_msg, diff --git a/rclpy/test/test_waitable.py b/rclpy/test/test_waitable.py index f79841c51..cccdc3da1 100644 --- a/rclpy/test/test_waitable.py +++ b/rclpy/test/test_waitable.py @@ -298,7 +298,7 @@ def tearDownClass(cls): def start_spin_thread(self, waitable): waitable.future = Future(executor=self.executor) self.thr = threading.Thread( - target=self.executor.spin_until_future_complete, args=(waitable.future,), daemon=True) + target=self.executor.spin_until_complete, args=(waitable.future,), daemon=True) self.thr.start() return self.thr From fabd752672b9c6af5bd09662088214dea2154ca5 Mon Sep 17 00:00:00 2001 From: Hubert Liberacki Date: Fri, 3 Jun 2022 15:05:13 +0200 Subject: [PATCH 2/3] Rename `spin_until_future_complete` with `spin_until_complete` in unit tests Signed-off-by: Hubert Liberacki --- rclpy/test/test_action_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclpy/test/test_action_client.py b/rclpy/test/test_action_client.py index 59fcad91e..3a7f82375 100644 --- a/rclpy/test/test_action_client.py +++ b/rclpy/test/test_action_client.py @@ -220,12 +220,12 @@ def test_send_goal_async_with_feedback_after_goal_result_requested(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) # Then request result goal_handle = goal_future.result() result_future = goal_handle.get_result_async() - rclpy.spin_until_future_complete(self.node, result_future, self.executor) + rclpy.spin_until_complete(self.node, result_future, self.executor) self.assertTrue(result_future.done()) # Publish feedback after goal result is requested From 97be886f588d41fe4f2db8031f787087dc81165e Mon Sep 17 00:00:00 2001 From: Hubert Liberacki Date: Mon, 4 Jul 2022 10:48:57 +0200 Subject: [PATCH 3/3] Correct code review findings * Deprecate old methods * fix comments Signed-off-by: Hubert Liberacki --- rclpy/rclpy/__init__.py | 2 +- rclpy/rclpy/executors.py | 23 +++++++---------------- rclpy/test/test_executor.py | 6 +++--- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/rclpy/rclpy/__init__.py b/rclpy/rclpy/__init__.py index 1a30d4b3b..d1527222b 100644 --- a/rclpy/rclpy/__init__.py +++ b/rclpy/rclpy/__init__.py @@ -239,7 +239,7 @@ def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = No executor = get_global_executor() if executor is None else executor try: executor.add_node(node) - executor.spin_once(duration_sec) + executor.spin_for(duration_sec) finally: executor.remove_node(node) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 0ac57f2d9..c17a63e1e 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -32,6 +32,7 @@ from typing import TypeVar from typing import Union +import warnings from rclpy.client import Client from rclpy.clock import Clock @@ -283,11 +284,7 @@ def spin_for(self, duration_sec: float = None) -> None: self.spin_until_complete(lambda: False, duration_sec) def spin_until_complete(self, condition, timeout_sec: float = None) -> None: - """ - Execute callbacks until a given condition is done or a timeout occurs. - - Deprecated in favor of spin_until_complete. - """ + """Execute callbacks until a given condition is done or a timeout occurs.""" # Common conditon for safisfying both Callable and Future finish_condition = None if (isinstance(condition, Future)): @@ -317,11 +314,8 @@ def finish_condition(): return condition() timeout_left = end - now def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: - """ - Execute callbacks until a given future is done or a timeout occurs. - - Deprecated in favor of spin_until_complete. - """ + """Execute callbacks until a given future is done or a timeout occurs.""" + warnings.warn('Deprecated in favor of spin_until_complete.') self.spin_until_complete(future, timeout_sec) def spin_once(self, timeout_sec: float = None) -> None: @@ -358,9 +352,8 @@ def spin_once_until_future_complete(self, future: Future, timeout_sec: float = N :param future: The executor will wait until this future is done. :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. Don't wait if 0. - - Deprecated in favor of spin_once_until_complete. """ + warnings.warn('Deprecated in favor of spin_once_until_complete.') raise NotImplementedError() def _take_timer(self, tmr): @@ -764,9 +757,8 @@ def spin_once(self, timeout_sec: float = None) -> None: def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: self.spin_once(timeout_sec) - """Deprecated in favor of spin_once_until_complete""" - def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + warnings.warn('Deprecated in favor of spin_once_until_complete.') self.spin_once_until_complete(timeout_sec) @@ -815,7 +807,6 @@ def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None self._spin_once_impl(timeout_sec, condition if callable( condition) else condition.done) - """Deprecated in favor of spin_once_until_complete""" - def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + warnings.warn('Deprecated in favor of spin_once_until_complete.') self.spin_once_until_complete(timeout_sec, future.done) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 96e84ae22..feb7ebf64 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -355,12 +355,12 @@ def test_executor_spin_for(self): def timer_callback(): pass - timer = self.node.create_timer(0.1, timer_callback) + timer = self.node.create_timer(1.0, timer_callback) start = time.monotonic() - executor.spin_for(duration_sec=1.0) + executor.spin_for(duration_sec=10.0) end = time.monotonic() - self.assertGreaterEqual(end - start, 1.0) + self.assertGreaterEqual(end - start, 10.0) timer.cancel()