diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index b8531db17..c01dd7f2e 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -1104,8 +1104,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: ) with self._pause_resume_lock: - assert self._scheduler is not None - assert self._leaser is not None + if self._scheduler is None or self._leaser is None: + _LOGGER.debug( + f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing." + ) + return for received_message in received_messages: if ( diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 278f3e88e..b2c60dc9d 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1375,6 +1375,62 @@ def test_close_blocking_scheduler_shutdown(): scheduler.shutdown.assert_called_once_with(await_msg_callbacks=True) +def test__on_response_none_scheduler(): + manager, _, _, _, _, _ = make_running_manager() + + manager._callback = mock.sentinel.callback + manager._scheduler = None + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack1", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="ack2", + message=gapic_types.PubsubMessage(data=b"bar", message_id="2"), + delivery_attempt=6, + ), + ] + ) + + manager._messages_on_hold.put = mock.Mock() + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + manager._on_response(response) + + manager._messages_on_hold.put.assert_not_called + + +def test__on_response_none_leaser(): + manager, _, _, _, _, _ = make_running_manager() + + manager._callback = mock.sentinel.callback + manager._leaser = None + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack1", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="ack2", + message=gapic_types.PubsubMessage(data=b"bar", message_id="2"), + delivery_attempt=6, + ), + ] + ) + + manager._messages_on_hold.put = mock.Mock() + + manager._on_response(response) + + manager._messages_on_hold.put.assert_not_called + + def test_close_nonblocking_scheduler_shutdown(): manager, _, _, _, _, _ = make_running_manager(await_callbacks_on_shutdown=False) scheduler = manager._scheduler