Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Replace asserts with None checks for graceful shutdown #1244

Merged
merged 2 commits into from
Sep 9, 2024
Merged

Conversation

mukund-ananthu
Copy link
Contributor

@mukund-ananthu mukund-ananthu commented Sep 9, 2024

There are two code paths that race to set / read the value of self._scheduler variable of streaming_pull_manager

Code path 1

future.cancel():

def cancel(self) -> bool:
"""Stops pulling messages and shutdowns the background thread consuming
messages.
The method always returns ``True``, as the shutdown is always initiated.
However, if the background stream is already being shut down or the shutdown
has completed, this method is a no-op.
.. versionchanged:: 2.4.1
The method does not block anymore, it just triggers the shutdown and returns
immediately. To block until the background stream is terminated, call
:meth:`result()` after cancelling the future.
.. versionchanged:: 2.10.0
The method always returns ``True`` instead of ``None``.
"""
# NOTE: We circumvent the base future's self._state to track the cancellation
# state, as this state has different meaning with streaming pull futures.
self.__cancelled = True
self.__manager.close()
return True

calls

streaming_pull_manager.close():

which schedules the _shutdown method:

self._regular_shutdown_thread = threading.Thread(
name=_REGULAR_SHUTDOWN_THREAD_NAME,
daemon=True,
target=self._shutdown,
kwargs={"reason": reason},
)
self._regular_shutdown_thread.start()

that sets the self._scheduler variable to None using a lock self._closing:

with self._closing:
if self._closed:
return
# Stop consuming messages.
if self.is_active:
_LOGGER.debug("Stopping consumer.")
assert self._consumer is not None
self._consumer.stop()
self._consumer = None
# Shutdown all helper threads
_LOGGER.debug("Stopping scheduler.")
assert self._scheduler is not None
dropped_messages = self._scheduler.shutdown(
await_msg_callbacks=self._await_callbacks_on_shutdown
)
self._scheduler = None

Code Path 2

The streaming_pull_manager's _on_response method:

with self._pause_resume_lock:
assert self._scheduler is not None
assert self._leaser is not None
for received_message in received_messages:
if (
not self._exactly_once_delivery_enabled()
or received_message.ack_id not in expired_ack_ids
):
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
self._exactly_once_delivery_enabled,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self._leaser.add([req])
asserts that the self._scheduler variable is not None before proceeding to use it for processing further.

Since the two code paths use different locks and are running on different threads, there are times when code path1 sets the self._scheduler value to None, while the _on_response method is still executing. The assert in the _on_response method fails causing an assertion error to be thrown. This happens when the library tries to process streaming pull responses when the library is also being shut down.

This change replaces the asserts with a None check that would return early and stop processing(same behavior as an assert minus the error being thrown), resulting in a graceful shutdown of the library without red herring errors being logged to the user.

Fixes #997 🦕

@mukund-ananthu mukund-ananthu requested review from a team as code owners September 9, 2024 00:41
@mukund-ananthu mukund-ananthu self-assigned this Sep 9, 2024
@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: pubsub Issues related to the googleapis/python-pubsub API. labels Sep 9, 2024
Copy link

conventional-commit-lint-gcf bot commented Sep 9, 2024

🤖 I detect that the PR title and the commit message differ and there's only one commit. To use the PR title for the commit history, you can use Github's automerge feature with squashing, or use automerge label. Good luck human!

-- conventional-commit-lint bot
https://conventionalcommits.org/

@mukund-ananthu mukund-ananthu added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Sep 9, 2024
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Sep 9, 2024
@mukund-ananthu
Copy link
Contributor Author

@parthea PTAL.

@mukund-ananthu
Copy link
Contributor Author

@hongalex

@mukund-ananthu mukund-ananthu enabled auto-merge (squash) September 9, 2024 18:51
@mukund-ananthu mukund-ananthu merged commit ced4f52 into main Sep 9, 2024
26 of 28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. size: m Pull request size is medium.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Possible race condition between _on_response and close
4 participants