Skip to content

Commit

Permalink
Add a max_size argument to Pub / Sub Batch. (#3157)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukesneeringer authored Mar 20, 2017
1 parent 45c6a0a commit 6cca1ec
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 30 deletions.
46 changes: 38 additions & 8 deletions pubsub/google/cloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

"""Define API Topics."""

import base64
import json
import time

from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud._helpers import _NOW
from google.cloud._helpers import _to_bytes
from google.cloud.exceptions import NotFound
from google.cloud.pubsub._helpers import topic_name_from_path
from google.cloud.pubsub.iam import Policy
Expand Down Expand Up @@ -255,7 +258,7 @@ def publish(self, message, client=None, **attrs):
message_ids = api.topic_publish(self.full_name, [message_data])
return message_ids[0]

def batch(self, client=None):
def batch(self, client=None, **kwargs):
"""Return a batch to use as a context manager.
Example:
Expand All @@ -275,11 +278,15 @@ def batch(self, client=None):
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current topic.
:type kwargs: dict
:param kwargs: Keyword arguments passed to the
:class:`~google.cloud.pubsub.topic.Batch` constructor.
:rtype: :class:`Batch`
:returns: A batch to use as a context manager.
"""
client = self._require_client(client)
return Batch(self, client)
return Batch(self, client, **kwargs)

def list_subscriptions(self, page_size=None, page_token=None, client=None):
"""List subscriptions for the project associated with this client.
Expand Down Expand Up @@ -426,11 +433,16 @@ class Batch(object):
before automatically commiting. Defaults to infinity
(off).
:type max_messages: float
:param max_size: The maximum size that the serialized messages can be
before automatically commiting. Defaults to 9 MB
(slightly less than the API limit).
:type max_size: int
"""
_INFINITY = float('inf')

def __init__(self, topic, client, max_interval=_INFINITY,
max_messages=_INFINITY):
max_messages=_INFINITY, max_size=1024 * 1024 * 9):
self.topic = topic
self.messages = []
self.message_ids = []
Expand All @@ -440,9 +452,12 @@ def __init__(self, topic, client, max_interval=_INFINITY,
# is exceeded, then the .publish() method will imply a commit.
self._max_interval = max_interval
self._max_messages = max_messages
self._max_size = max_size

# Set the initial starting timestamp (used against the interval).
# Set the initial starting timestamp (used against the interval)
# and initial size.
self._start_timestamp = time.time()
self._current_size = 0

def __enter__(self):
return self
Expand All @@ -464,16 +479,24 @@ def publish(self, message, **attrs):
:param attrs: key-value pairs to send as message attributes
"""
self.topic._timestamp_message(attrs)
self.messages.append(
{'data': message,
'attributes': attrs})

# Append the message to the list of messages..
item = {'attributes': attrs, 'data': message}
self.messages.append(item)

# Determine the approximate size of the message, and increment
# the current batch size appropriately.
encoded = base64.b64encode(_to_bytes(message))
encoded += base64.b64encode(
json.dumps(attrs, ensure_ascii=False).encode('utf8'),
)
self._current_size += len(encoded)

# If too much time has elapsed since the first message
# was added, autocommit.
now = time.time()
if now - self._start_timestamp > self._max_interval:
self.commit()
self._start_timestamp = now
return

# If the number of messages on the list is greater than the
Expand All @@ -482,6 +505,11 @@ def publish(self, message, **attrs):
self.commit()
return

# If we have reached the max size, autocommit.
if self._current_size >= self._max_size:
self.commit()
return

def commit(self, client=None):
"""Send saved messages as a single API call.
Expand All @@ -499,3 +527,5 @@ def commit(self, client=None):
message_ids = api.topic_publish(self.topic.full_name, self.messages[:])
self.message_ids.extend(message_ids)
del self.messages[:]
self._start_timestamp = time.time()
self._current_size = 0
82 changes: 60 additions & 22 deletions pubsub/unit_tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,35 @@ def test_context_mgr_failure(self):
self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2])
self.assertEqual(getattr(api, '_topic_published', self), self)

def test_batch_messages(self):
# Establish that a batch actually batches messsages in the expected
# way.
client = _Client(project='PROJECT')
topic = _Topic(name='TOPIC')

# Track commits, but do not perform them.
Batch = self._get_target_class()
with mock.patch.object(Batch, 'commit') as commit:
with self._make_one(topic, client=client) as batch:
self.assertIsInstance(batch, Batch)

# Publish four messages and establish that the batch does
# not commit.
for i in range(0, 4):
batch.publish('Batch message %d.' % (i,))
commit.assert_not_called()

# Check the contents of the batch.
self.assertEqual(batch.messages, [
{'data': 'Batch message 0.', 'attributes': {}},
{'data': 'Batch message 1.', 'attributes': {}},
{'data': 'Batch message 2.', 'attributes': {}},
{'data': 'Batch message 3.', 'attributes': {}},
])

def test_message_count_autocommit(self):
"""Establish that if the batch is assigned to take a maximum
number of messages, that it commits when it reaches that maximum.
"""
# Establish that if the batch is assigned to take a maximum
# number of messages, that it commits when it reaches that maximum.
client = _Client(project='PROJECT')
topic = _Topic(name='TOPIC')

Expand All @@ -795,17 +820,11 @@ def test_message_count_autocommit(self):
# Publish four messages and establish that the batch does
# not commit.
for i in range(0, 4):
batch.publish({
'attributes': {},
'data': 'Batch message %d.' % (i,),
})
batch.publish('Batch message %d.' % (i,))
commit.assert_not_called()

# Publish a fifth message and observe the commit.
batch.publish({
'attributes': {},
'data': 'The final call to trigger a commit!',
})
batch.publish('The final call to trigger a commit!')
commit.assert_called_once_with()

# There should be a second commit after the context manager
Expand All @@ -814,9 +833,8 @@ def test_message_count_autocommit(self):

@mock.patch('time.time')
def test_message_time_autocommit(self, mock_time):
"""Establish that if the batch is sufficiently old, that it commits
the next time it receives a publish.
"""
# Establish that if the batch is sufficiently old, that it commits
# the next time it receives a publish.
client = _Client(project='PROJECT')
topic = _Topic(name='TOPIC')

Expand All @@ -830,20 +848,40 @@ def test_message_time_autocommit(self, mock_time):
# Publish some messages and establish that the batch does
# not commit.
for i in range(0, 10):
batch.publish({
'attributes': {},
'data': 'Batch message %d.' % (i,),
})
batch.publish('Batch message %d.' % (i,))
commit.assert_not_called()

# Move time ahead so that this batch is too old.
mock_time.return_value = 10.0

# Publish another message and observe the commit.
batch.publish({
'attributes': {},
'data': 'The final call to trigger a commit!',
})
batch.publish('The final call to trigger a commit!')
commit.assert_called_once_with()

# There should be a second commit after the context manager
# exits.
self.assertEqual(commit.call_count, 2)

def test_message_size_autocommit(self):
# Establish that if the batch is sufficiently large, that it
# auto-commits.
client = _Client(project='PROJECT')
topic = _Topic(name='TOPIC')

# Track commits, but do not perform them.
Batch = self._get_target_class()
with mock.patch.object(Batch, 'commit') as commit:
with self._make_one(topic, client=client, max_size=100) as batch:
self.assertIsInstance(batch, Batch)

# Publish a short (< 100 bytes) message and establish that
# the batch does not commit.
batch.publish(b'foo')
commit.assert_not_called()

# Publish another message and observe the commit.
batch.publish(u'The final call to trigger a commit, because '
u'this message is sufficiently long.')
commit.assert_called_once_with()

# There should be a second commit after the context manager
Expand Down

0 comments on commit 6cca1ec

Please sign in to comment.