diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index b0898328cb60..5490617a3ea5 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -444,9 +444,9 @@ class Batch(object): def __init__(self, topic, client, max_interval=_INFINITY, max_messages=_INFINITY, max_size=1024 * 1024 * 9): self.topic = topic + self.client = client self.messages = [] self.message_ids = [] - self.client = client # Set the autocommit rules. If the interval or number of messages # is exceeded, then the .publish() method will imply a commit. @@ -454,10 +454,9 @@ def __init__(self, topic, client, max_interval=_INFINITY, self._max_messages = max_messages self._max_size = max_size - # Set the initial starting timestamp (used against the interval) - # and initial size. - self._start_timestamp = time.time() - self._current_size = 0 + # Set up the initial state, initializing messages, the starting + # timestamp, etc. + self._reset_state() def __enter__(self): return self @@ -469,6 +468,13 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __iter__(self): return iter(self.message_ids) + def _reset_state(self): + """Reset the state of this batch.""" + + del self.messages[:] + self._start_timestamp = time.time() + self._current_size = 0 + def publish(self, message, **attrs): """Emulate publishing a message, but save it. @@ -526,6 +532,4 @@ def commit(self, client=None): api = client.publisher_api message_ids = api.topic_publish(self.topic.full_name, self.messages[:]) self.message_ids.extend(message_ids) - del self.messages[:] - self._start_timestamp = time.time() - self._current_size = 0 + self._reset_state()