Skip to content

Commit

Permalink
Add Batch._reset_state for DRY. (#3169)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukesneeringer authored Mar 20, 2017
1 parent 6cca1ec commit 1bd4f11
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions pubsub/google/cloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,19 @@ class Batch(object):
def __init__(self, topic, client, max_interval=_INFINITY,
max_messages=_INFINITY, max_size=1024 * 1024 * 9):
self.topic = topic
self.client = client
self.messages = []
self.message_ids = []
self.client = client

# Set the autocommit rules. If the interval or number of messages
# is exceeded, then the .publish() method will imply a commit.
self._max_interval = max_interval
self._max_messages = max_messages
self._max_size = max_size

# Set the initial starting timestamp (used against the interval)
# and initial size.
self._start_timestamp = time.time()
self._current_size = 0
# Set up the initial state, initializing messages, the starting
# timestamp, etc.
self._reset_state()

def __enter__(self):
return self
Expand All @@ -469,6 +468,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def __iter__(self):
return iter(self.message_ids)

def _reset_state(self):
"""Reset the state of this batch."""

del self.messages[:]
self._start_timestamp = time.time()
self._current_size = 0

def publish(self, message, **attrs):
"""Emulate publishing a message, but save it.
Expand Down Expand Up @@ -526,6 +532,4 @@ def commit(self, client=None):
api = client.publisher_api
message_ids = api.topic_publish(self.topic.full_name, self.messages[:])
self.message_ids.extend(message_ids)
del self.messages[:]
self._start_timestamp = time.time()
self._current_size = 0
self._reset_state()

0 comments on commit 1bd4f11

Please sign in to comment.