From 7deb85b575bf27ea518d7bcad953b698334741bc Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 9 Mar 2015 16:30:48 -0400 Subject: [PATCH 01/10] Sketch 'gcloud.pubsub' API. --- docs/conf.py | 1 + docs/index.rst | 1 + docs/pubsub-api.rst | 251 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 253 insertions(+) create mode 100644 docs/pubsub-api.rst diff --git a/docs/conf.py b/docs/conf.py index 76a9cfae79af..f4bac42a91f8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -30,6 +30,7 @@ extensions = [ 'sphinx.ext.autodoc', 'sphinx.ext.autosummary', + 'sphinx.ext.doctest', 'sphinx.ext.todo', 'sphinx.ext.viewcode', ] diff --git a/docs/index.rst b/docs/index.rst index 9992a90c5e96..7994c5aa55a6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -14,6 +14,7 @@ storage-blobs storage-buckets storage-acl + pubsub-api Getting started diff --git a/docs/pubsub-api.rst b/docs/pubsub-api.rst new file mode 100644 index 000000000000..2893879ad6ea --- /dev/null +++ b/docs/pubsub-api.rst @@ -0,0 +1,251 @@ +``gcloud.pubsub`` API +===================== + +Connection / Authorization +-------------------------- + +- Inferred defaults used to create connection if none configured explicitly: + + - credentials (derived from GAE / GCE environ if present). + + - ``project_id`` (derived from GAE / GCE environ if present). + + - ``scopes`` + + +Manage topics for a project +--------------------------- + +Create a new topic for the default project: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> topic.create() # API request + +Create a new topic for an explicit project: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name', project_id='my.project') + >>> topic.create() # API request + +Check for the existance of a topic: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> topic.exists() # API request + True + +List topics for the default project: + +.. doctest:: + + >>> from gcloud import pubsub + >>> [topic.name for topic in pubsub.list_topics()] # API request + ['topic_name'] + +List topics for an explicit project: + +.. doctest:: + + >>> from gcloud import pubsub + >>> topics = pubsub.list_topics(project_id='my.project') # API request + >>> [topic.name for topic in topics] + ['topic_name'] + +Delete a topic: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> topic.delete() # API request + + +Publish messages to a topic +--------------------------- + +Publish a single message to a topic, without attributes: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> topic.publish('this is the message_payload') # API request + + +Publish a single message to a topic, with attributes: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> topic.publish('this is another message_payload', + ... attr1='value1', attr2='value2') # API request + + +Publish a set of messages to a topic (as a single request): + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> with topic as batch: + ... topic.publish('this is the first message_payload') + ... topic.publish('this is the second message_payload', + ... attr1='value1', attr2='value2') + >>> batch + [, ] + +.. note:: + + The only API request happens during the ``__exit__()`` of the topic + used as a context manager. + + +Manage subscriptions to topics +------------------------------ + +Create a new pull subscription for a topic: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> subscription.create() # API request + +Create a new pull subscription for a topic with a non-default ACK deadline: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', ack_deadline=90) + >>> subscription.create() # API request + +Create a new push subscription for a topic: + +.. doctest:: + + >>> ENDPOINT = 'https://example.com/hook' + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', push_endpoint=ENDPOINT) + >>> subscription.create() # API request + +Check for the existence of a subscription: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> subscription.exists() # API request + True + +Convert a pull subscription to push: + +.. doctest:: + + >>> ENDPOINT = 'https://example.com/hook' + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> subscription.modify_push_configuration(push_endpoint=ENDPOINT) # API request + +Convert a push subscription to pull: + +.. doctest:: + + >>> ENDPOINT = 'https://example.com/hook' + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic, + ... push_endpoint=ENDPOINT) + >>> subscription.modify_push_configuration(push_endpoint=None) # API request + +List subscriptions for a topic: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> topic = Topic('topic_name') + >>> subscriptions = topic.list_subscriptions() # API request + >>> [subscription.name for subscription in subscriptions] + ['subscription_name'] + +Delete a subscription: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> subscription.delete() # API request + + +Pull messages from a subscription +--------------------------------- + +Fetch pending messages for a pull subscription + +.. note:: + + The messages will have been ACKed already. + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> with topic: + ... topic.publish('this is the first message_payload') + ... topic.publish('this is the second message_payload', + ... attr1='value1', attr2='value2') + >>> messages = subscription.pull() # API request + >>> [message.id for message in messages] + [, ] + >>> [message.data for message in messages] + ['this is the first message_payload', 'this is the second message_payload'] + >>> [message.attrs for message in messages] + [{}, {'attr1': 'value1', 'attr2': 'value2'}] + +Fetch a limited number of pending messages for a pull subscription: + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> with topic: + ... topic.publish('this is the first message_payload') + ... topic.publish('this is the second message_payload', + ... attr1='value1', attr2='value2') + >>> [message.id for message in subscription.pull(max_messages=1)] + [] + +Fetch messages for a pull subscription without blocking (none pending): + +.. doctest:: + + >>> from gcloud.pubsub.topic import Topic + >>> from gcloud.pubsub.subscription import Subscription + >>> topic = Topic('topic_name') + >>> subscription = Subscription('subscription_name', topic) + >>> [message.id for message in subscription.pull(return_immediately=True)] + [] + From 832b98ff65d002d6349afa88e7cafaafca48c07c Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 17 Mar 2015 16:34:45 -0400 Subject: [PATCH 02/10] Add 'pubsub.connection.Connection' Copied from 'storage.connection.Connection', with storage-specific bits (e.g., upload) removed. --- gcloud/pubsub/__init__.py | 1 + gcloud/pubsub/connection.py | 221 ++++++++++++++++++++++++++ gcloud/pubsub/test_connection.py | 263 +++++++++++++++++++++++++++++++ 3 files changed, 485 insertions(+) create mode 100644 gcloud/pubsub/__init__.py create mode 100644 gcloud/pubsub/connection.py create mode 100644 gcloud/pubsub/test_connection.py diff --git a/gcloud/pubsub/__init__.py b/gcloud/pubsub/__init__.py new file mode 100644 index 000000000000..5bb534f795ae --- /dev/null +++ b/gcloud/pubsub/__init__.py @@ -0,0 +1 @@ +# package diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py new file mode 100644 index 000000000000..d5be7846ca82 --- /dev/null +++ b/gcloud/pubsub/connection.py @@ -0,0 +1,221 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Create / interact with gcloud pubsub connections.""" + +import json + +from six.moves.urllib.parse import urlencode # pylint: disable=F0401 + +from gcloud import connection as base_connection +from gcloud.exceptions import make_exception + + +class Connection(base_connection.Connection): + """A connection to Google Cloud Pubsub via the JSON REST API. + + This defines :meth:`Connection.api_request` for making a generic JSON + API request and API requests are created elsewhere (e.g. in + :mod:`gcloud.pubsub.api`). + """ + + API_BASE_URL = base_connection.API_BASE_URL + """The base of the API call URL.""" + + API_VERSION = 'v1beta2' + """The version of the API, used in building the API call's URL.""" + + API_URL_TEMPLATE = '{api_base_url}/pubsub/{api_version}{path}' + """A template for the URL of a particular API call.""" + + @classmethod + def build_api_url(cls, path, query_params=None, api_base_url=None, + api_version=None): + """Construct an API url given a few components, some optional. + + Typically, you shouldn't need to use this method. + + :type path: string + :param path: The path to the resource (ie, ``'/b/bucket-name'``). + + :type query_params: dict + :param query_params: A dictionary of keys and values to insert into + the query string of the URL. + + :type api_base_url: string + :param api_base_url: The base URL for the API endpoint. + Typically you won't have to provide this. + + :type api_version: string + :param api_version: The version of the API to call. + Typically you shouldn't provide this and instead + use the default for the library. + + :rtype: string + :returns: The URL assembled from the pieces provided. + """ + api_base_url = api_base_url or cls.API_BASE_URL + + url = cls.API_URL_TEMPLATE.format( + api_base_url=(api_base_url or cls.API_BASE_URL), + api_version=(api_version or cls.API_VERSION), + path=path) + + query_params = query_params or {} + if query_params: + url += '?' + urlencode(query_params) + + return url + + def _make_request(self, method, url, data=None, content_type=None, + headers=None): + """A low level method to send a request to the API. + + Typically, you shouldn't need to use this method. + + :type method: string + :param method: The HTTP method to use in the request. + + :type url: string + :param url: The URL to send the request to. + + :type data: string + :param data: The data to send as the body of the request. + + :type content_type: string + :param content_type: The proper MIME type of the data provided. + + :type headers: dict + :param headers: A dictionary of HTTP headers to send with the request. + + :rtype: tuple of ``response`` (a dictionary of sorts) + and ``content`` (a string). + :returns: The HTTP response object and the content of the response, + returned by :meth:`_do_request`. + """ + headers = headers or {} + headers['Accept-Encoding'] = 'gzip' + + if data: + content_length = len(str(data)) + else: + content_length = 0 + + headers['Content-Length'] = content_length + + if content_type: + headers['Content-Type'] = content_type + + headers['User-Agent'] = self.USER_AGENT + + return self._do_request(method, url, headers, data) + + def _do_request(self, method, url, headers, data): + """Low-level helper: perform the actual API request over HTTP. + + Allows :class:`gcloud.pubsub.batch.Batch` to override, deferring + the request. + + :type method: string + :param method: The HTTP method to use in the request. + + :type url: string + :param url: The URL to send the request to. + + :type headers: dict + :param headers: A dictionary of HTTP headers to send with the request. + + :type data: string + :param data: The data to send as the body of the request. + + :rtype: tuple of ``response`` (a dictionary of sorts) + and ``content`` (a string). + :returns: The HTTP response object and the content of the response. + """ + return self.http.request(uri=url, method=method, headers=headers, + body=data) + + def api_request(self, method, path, query_params=None, + data=None, content_type=None, + api_base_url=None, api_version=None, + expect_json=True): + """Make a request over the HTTP transport to the Cloud Storage API. + + You shouldn't need to use this method, but if you plan to + interact with the API using these primitives, this is the + correct one to use... + + :type method: string + :param method: The HTTP method name (ie, ``GET``, ``POST``, etc). + Required. + + :type path: string + :param path: The path to the resource (ie, ``'/b/bucket-name'``). + Required. + + :type query_params: dict + :param query_params: A dictionary of keys and values to insert into + the query string of the URL. Default is + empty dict. + + :type data: string + :param data: The data to send as the body of the request. Default is + the empty string. + + :type content_type: string + :param content_type: The proper MIME type of the data provided. Default + is None. + + :type api_base_url: string + :param api_base_url: The base URL for the API endpoint. + Typically you won't have to provide this. + Default is the standard API base URL. + + :type api_version: string + :param api_version: The version of the API to call. Typically + you shouldn't provide this and instead use + the default for the library. Default is the + latest API version supported by + gcloud-python. + + :type expect_json: boolean + :param expect_json: If True, this method will try to parse the + response as JSON and raise an exception if + that cannot be done. Default is True. + + :raises: Exception if the response code is not 200 OK. + """ + url = self.build_api_url(path=path, query_params=query_params, + api_base_url=api_base_url, + api_version=api_version) + + # Making the executive decision that any dictionary + # data will be sent properly as JSON. + if data and isinstance(data, dict): + data = json.dumps(data) + content_type = 'application/json' + + response, content = self._make_request( + method=method, url=url, data=data, content_type=content_type) + + if not 200 <= response.status < 300: + raise make_exception(response, content) + + if content and expect_json: + content_type = response.get('content-type', '') + if not content_type.startswith('application/json'): + raise TypeError('Expected JSON, got %s' % content_type) + return json.loads(content) + + return content diff --git a/gcloud/pubsub/test_connection.py b/gcloud/pubsub/test_connection.py new file mode 100644 index 000000000000..a88817c9dc86 --- /dev/null +++ b/gcloud/pubsub/test_connection.py @@ -0,0 +1,263 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestConnection(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.connection import Connection + return Connection + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_defaults(self): + conn = self._makeOne() + self.assertEqual(conn.credentials, None) + + def test_ctor_explicit(self): + creds = object() + conn = self._makeOne(creds) + self.assertTrue(conn.credentials is creds) + + def test_http_w_existing(self): + conn = self._makeOne() + conn._http = http = object() + self.assertTrue(conn.http is http) + + def test_http_wo_creds(self): + import httplib2 + conn = self._makeOne() + self.assertTrue(isinstance(conn.http, httplib2.Http)) + + def test_http_w_creds(self): + import httplib2 + authorized = object() + + class Creds(object): + def authorize(self, http): + self._called_with = http + return authorized + creds = Creds() + conn = self._makeOne(creds) + self.assertTrue(conn.http is authorized) + self.assertTrue(isinstance(creds._called_with, httplib2.Http)) + + def test_build_api_url_no_extra_query_params(self): + conn = self._makeOne() + URI = '/'.join([ + conn.API_BASE_URL, + 'pubsub', + conn.API_VERSION, + 'foo', + ]) + self.assertEqual(conn.build_api_url('/foo'), URI) + + def test_build_api_url_w_extra_query_params(self): + from six.moves.urllib.parse import parse_qsl + from six.moves.urllib.parse import urlsplit + conn = self._makeOne() + uri = conn.build_api_url('/foo', {'bar': 'baz'}) + scheme, netloc, path, qs, _ = urlsplit(uri) + self.assertEqual('%s://%s' % (scheme, netloc), conn.API_BASE_URL) + self.assertEqual(path, + '/'.join(['', 'pubsub', conn.API_VERSION, 'foo'])) + parms = dict(parse_qsl(qs)) + self.assertEqual(parms['bar'], 'baz') + + def test__make_request_no_data_no_content_type_no_headers(self): + conn = self._makeOne() + URI = 'http://example.com/test' + http = conn._http = Http( + {'status': '200', 'content-type': 'text/plain'}, + '', + ) + headers, content = conn._make_request('GET', URI) + self.assertEqual(headers['status'], '200') + self.assertEqual(headers['content-type'], 'text/plain') + self.assertEqual(content, '') + self.assertEqual(http._called_with['method'], 'GET') + self.assertEqual(http._called_with['uri'], URI) + self.assertEqual(http._called_with['body'], None) + expected_headers = { + 'Accept-Encoding': 'gzip', + 'Content-Length': 0, + 'User-Agent': conn.USER_AGENT, + } + self.assertEqual(http._called_with['headers'], expected_headers) + + def test__make_request_w_data_no_extra_headers(self): + conn = self._makeOne() + URI = 'http://example.com/test' + http = conn._http = Http( + {'status': '200', 'content-type': 'text/plain'}, + '', + ) + conn._make_request('GET', URI, {}, 'application/json') + self.assertEqual(http._called_with['method'], 'GET') + self.assertEqual(http._called_with['uri'], URI) + self.assertEqual(http._called_with['body'], {}) + expected_headers = { + 'Accept-Encoding': 'gzip', + 'Content-Length': 0, + 'Content-Type': 'application/json', + 'User-Agent': conn.USER_AGENT, + } + self.assertEqual(http._called_with['headers'], expected_headers) + + def test__make_request_w_extra_headers(self): + conn = self._makeOne() + URI = 'http://example.com/test' + http = conn._http = Http( + {'status': '200', 'content-type': 'text/plain'}, + '', + ) + conn._make_request('GET', URI, headers={'X-Foo': 'foo'}) + self.assertEqual(http._called_with['method'], 'GET') + self.assertEqual(http._called_with['uri'], URI) + self.assertEqual(http._called_with['body'], None) + expected_headers = { + 'Accept-Encoding': 'gzip', + 'Content-Length': 0, + 'X-Foo': 'foo', + 'User-Agent': conn.USER_AGENT, + } + self.assertEqual(http._called_with['headers'], expected_headers) + + def test_api_request_defaults(self): + PATH = '/path/required' + conn = self._makeOne() + URI = '/'.join([ + conn.API_BASE_URL, + 'pubsub', + '%s%s' % (conn.API_VERSION, PATH), + ]) + http = conn._http = Http( + {'status': '200', 'content-type': 'application/json'}, + '{}', + ) + self.assertEqual(conn.api_request('GET', PATH), {}) + self.assertEqual(http._called_with['method'], 'GET') + self.assertEqual(http._called_with['uri'], URI) + self.assertEqual(http._called_with['body'], None) + expected_headers = { + 'Accept-Encoding': 'gzip', + 'Content-Length': 0, + 'User-Agent': conn.USER_AGENT, + } + self.assertEqual(http._called_with['headers'], expected_headers) + + def test_api_request_w_non_json_response(self): + conn = self._makeOne() + conn._http = Http( + {'status': '200', 'content-type': 'text/plain'}, + 'CONTENT', + ) + + self.assertRaises(TypeError, conn.api_request, 'GET', '/') + + def test_api_request_wo_json_expected(self): + conn = self._makeOne() + conn._http = Http( + {'status': '200', 'content-type': 'text/plain'}, + 'CONTENT', + ) + self.assertEqual(conn.api_request('GET', '/', expect_json=False), + 'CONTENT') + + def test_api_request_w_query_params(self): + from six.moves.urllib.parse import parse_qsl + from six.moves.urllib.parse import urlsplit + conn = self._makeOne() + http = conn._http = Http( + {'status': '200', 'content-type': 'application/json'}, + '{}', + ) + self.assertEqual(conn.api_request('GET', '/', {'foo': 'bar'}), {}) + self.assertEqual(http._called_with['method'], 'GET') + uri = http._called_with['uri'] + scheme, netloc, path, qs, _ = urlsplit(uri) + self.assertEqual('%s://%s' % (scheme, netloc), conn.API_BASE_URL) + self.assertEqual(path, + '/'.join(['', 'pubsub', conn.API_VERSION, ''])) + parms = dict(parse_qsl(qs)) + self.assertEqual(parms['foo'], 'bar') + self.assertEqual(http._called_with['body'], None) + expected_headers = { + 'Accept-Encoding': 'gzip', + 'Content-Length': 0, + 'User-Agent': conn.USER_AGENT, + } + self.assertEqual(http._called_with['headers'], expected_headers) + + def test_api_request_w_data(self): + import json + DATA = {'foo': 'bar'} + DATAJ = json.dumps(DATA) + conn = self._makeOne() + URI = '/'.join([ + conn.API_BASE_URL, + 'pubsub', + conn.API_VERSION, + '', + ]) + http = conn._http = Http( + {'status': '200', 'content-type': 'application/json'}, + '{}', + ) + self.assertEqual(conn.api_request('POST', '/', data=DATA), {}) + self.assertEqual(http._called_with['method'], 'POST') + self.assertEqual(http._called_with['uri'], URI) + self.assertEqual(http._called_with['body'], DATAJ) + expected_headers = { + 'Accept-Encoding': 'gzip', + 'Content-Length': len(DATAJ), + 'Content-Type': 'application/json', + 'User-Agent': conn.USER_AGENT, + } + self.assertEqual(http._called_with['headers'], expected_headers) + + def test_api_request_w_404(self): + from gcloud.exceptions import NotFound + conn = self._makeOne() + conn._http = Http( + {'status': '404', 'content-type': 'text/plain'}, + '{}' + ) + self.assertRaises(NotFound, conn.api_request, 'GET', '/') + + def test_api_request_w_500(self): + from gcloud.exceptions import InternalServerError + conn = self._makeOne() + conn._http = Http( + {'status': '500', 'content-type': 'text/plain'}, + '{}', + ) + self.assertRaises(InternalServerError, conn.api_request, 'GET', '/') + + +class Http(object): + + _called_with = None + + def __init__(self, headers, content): + from httplib2 import Response + self._response = Response(headers) + self._content = content + + def request(self, **kw): + self._called_with = kw + return self._response, self._content From fd69e33ba7715053032d1a87cb0acdf82a93a698 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 17 Mar 2015 17:52:57 -0400 Subject: [PATCH 03/10] Add 'pubsub.topic.Topic' class. Include 'create', 'delete', 'exists', and 'publish' methods. Inclue 'pubsub.topic.Topic.batch', which allows publishing multiple messages to a topic as a single API request. --- docs/pubsub-api.rst | 8 +- gcloud/pubsub/test_topic.py | 196 ++++++++++++++++++++++++++++++++++++ gcloud/pubsub/topic.py | 149 +++++++++++++++++++++++++++ 3 files changed, 349 insertions(+), 4 deletions(-) create mode 100644 gcloud/pubsub/test_topic.py create mode 100644 gcloud/pubsub/topic.py diff --git a/docs/pubsub-api.rst b/docs/pubsub-api.rst index 2893879ad6ea..73708ed90fcb 100644 --- a/docs/pubsub-api.rst +++ b/docs/pubsub-api.rst @@ -95,11 +95,11 @@ Publish a set of messages to a topic (as a single request): >>> from gcloud.pubsub.topic import Topic >>> topic = Topic('topic_name') - >>> with topic as batch: - ... topic.publish('this is the first message_payload') - ... topic.publish('this is the second message_payload', + >>> with topic.batch() as batch: + ... batch.publish('this is the first message_payload') + ... batch.publish('this is the second message_payload', ... attr1='value1', attr2='value2') - >>> batch + >>> list(batch) [, ] .. note:: diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py new file mode 100644 index 000000000000..aafabe6adb93 --- /dev/null +++ b/gcloud/pubsub/test_topic.py @@ -0,0 +1,196 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestTopic(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.topic import Topic + return Topic + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_wo_inferred_project_or_connection(self): + TOPIC_NAME = 'topic_name' + topic = self._makeOne(TOPIC_NAME) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, None) + self.assertEqual(topic.connection, None) + + def test_ctor_w_explicit_project_and_connection(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + conn = _Connection() + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertTrue(topic.connection is conn) + + def test_create(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'name': PATH}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + topic.create() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'PUT') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_exists_miss(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection() + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + self.assertFalse(topic.exists()) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_exists_hit(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'name': PATH}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + self.assertTrue(topic.exists()) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_delete(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + topic.delete() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_publish_single_wo_attrs(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TEXT = 'This is the message text' + B64 = base64.b64encode(TEXT) + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, + 'attributes': {}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + msgid = topic.publish(TEXT) + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_single_w_attrs(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TEXT = 'This is the message text' + B64 = base64.b64encode(TEXT) + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + msgid = topic.publish(TEXT, attr1='value1', attr2='value2') + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_multiple(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TEXT1 = 'This is the first message text' + TEXT2 = 'This is the second message text' + B64_1 = base64.b64encode(TEXT1) + B64_2 = base64.b64encode(TEXT2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1, + 'attributes': {}} + MESSAGE2 = {'data': B64_2, + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + with topic.batch() as batch: + batch.publish(TEXT1) + batch.publish(TEXT2, attr1='value1', attr2='value2') + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_publish_multiple_error(self): + class Bugout(Exception): + pass + + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TEXT1 = 'This is the first message text' + TEXT2 = 'This is the second message text' + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + try: + with topic.batch() as batch: + batch.publish(TEXT1) + batch.publish(TEXT2, attr1='value1', attr2='value2') + raise Bugout() + except Bugout: + pass + self.assertEqual(list(batch), []) + self.assertEqual(len(conn._requested), 0) + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + from gcloud.exceptions import NotFound + self._requested.append(kw) + + try: + response, self._responses = self._responses[0], self._responses[1:] + except: + raise NotFound('miss') + else: + return response diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py new file mode 100644 index 000000000000..06ea946b857a --- /dev/null +++ b/gcloud/pubsub/topic.py @@ -0,0 +1,149 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Define API Topics.""" + +import base64 + +from gcloud.exceptions import NotFound + + +class Topic(object): + """Topics are targets to which messages can be published. + + Subscribers then receive those messages. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics + + :type name: string + :param name: the name of the topic + + :type project: string + :param project: the project to which the topic belongs. If not passed, + falls back to the default inferred from the environment. + + :type connection: :class:gcloud.pubsub.connection.Connection + :param connection: the connection to use. If not passed, + falls back to the default inferred from the + environment. + """ + def __init__(self, name, project=None, connection=None): + self.name = name + self.project = project + self.connection = connection + + @property + def path(self): + """URL path for the topic's APIs""" + return '/projects/%s/topics/%s' % (self.project, self.name) + + def create(self): + """API call: create the topic via a PUT request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/create + """ + self.connection.api_request(method='PUT', path=self.path) + + def exists(self): + """API call: test for the existence of the topic via a GET request + + See + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/get + """ + try: + self.connection.api_request(method='GET', path=self.path) + except NotFound: + return False + else: + return True + + def publish(self, message, **attrs): + """API call: publish a message to a topic via a POST request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/publish + + :type message: string + :param message: the message payload + + :type attrs: dict (string -> string) + :message attrs: key-value pairs to send as message attributes + + :rtype: str + :returns: message ID assigned by the server to the published message + """ + message_data = {'data': base64.b64encode(message), 'attributes': attrs} + data = {'messages': [message_data]} + response = self.connection.api_request(method='POST', + path='%s:publish' % self.path, + data=data) + return response['messageIds'][0] + + def batch(self): + """Return a batch to use as a context manager. + + :rtype: :class:_Batch + """ + return _Batch(self) + + def delete(self): + """API call: delete the topic via a DELETE request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/delete + """ + self.connection.api_request(method='DELETE', path=self.path) + + +class _Batch(object): + """Context manager: collect messages to publish via a single API call. + + Helper returned by :meth:Topic.batch + """ + def __init__(self, topic): + self.topic = topic + self.messages = [] + self.message_ids = [] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + self.commit() + + def __iter__(self): + return iter(self.message_ids) + + def publish(self, message, **attrs): + """Emulate publishing a message, but save it. + + :type message: string + :param message: the message payload + + :type attrs: dict (string -> string) + :message attrs: key-value pairs to send as message attributes + """ + self.messages.append( + {'data': base64.b64encode(message), 'attributes': attrs}) + + def commit(self): + """Send saved messages as a single API call.""" + conn = self.topic.connection + response = conn.api_request(method='POST', + path='%s:publish' % self.topic.path, + data={'messages': self.messages}) + self.message_ids.extend(response['messageIds']) From 3b5fa8629637e6bd63d991693f8fbeb0d573c5ba Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 11:23:04 -0400 Subject: [PATCH 04/10] Add 'pubsub.api.list_topics'. --- gcloud/pubsub/api.py | 57 ++++++++++++++++++++++++++++++ gcloud/pubsub/test_api.py | 73 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 gcloud/pubsub/api.py create mode 100644 gcloud/pubsub/test_api.py diff --git a/gcloud/pubsub/api.py b/gcloud/pubsub/api.py new file mode 100644 index 000000000000..4ec1d398ac5e --- /dev/null +++ b/gcloud/pubsub/api.py @@ -0,0 +1,57 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Define API functions (not bound to classes).""" + + +def list_topics(project=None, connection=None, + page_size=None, page_token=None): + """List topics for a given project. + + :type project: string + :param project: project ID to query. If not passed, defaults to the + project ID inferred from the environment. + + :type connection: :class:`gcloud.pubsub.connection.Connection` + :param connection: connection to use for the query. If not passed, + defaults to the connection inferred from the + environment. + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of topics. + + :rtype: dict + :returns: keys include ``topics`` (a list of topic mappings) and + ``nextPageToken`` (a string: if non-empty, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + path = '/projects/%s/topics' % project + if params: + return connection.api_request(method='GET', path=path, + query_params=params) + return connection.api_request(method='GET', path=path) diff --git a/gcloud/pubsub/test_api.py b/gcloud/pubsub/test_api.py new file mode 100644 index 000000000000..c7e1a8d24436 --- /dev/null +++ b/gcloud/pubsub/test_api.py @@ -0,0 +1,73 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class Test_list_topics(unittest2.TestCase): + + def _callFUT(self, *args, **kw): + from gcloud.pubsub.api import list_topics + return list_topics(*args, **kw) + + def test_w_explicit_connection_no_paging(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOKEN = 'TOKEN' + returned = {'topics': [{'name': TOPIC_NAME}], + 'nextPageToken': TOKEN} + conn = _Connection(returned) + response = self._callFUT(PROJECT, conn) + topics = response['topics'] + self.assertEqual(len(topics), 1) + self.assertEqual(topics[0], {'name': TOPIC_NAME}) + self.assertEqual(response['nextPageToken'], TOKEN) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) + self.assertEqual(req.get('query_params'), None) + + def test_w_explicit_connection_w_paging(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + returned = {'topics': [{'name': TOPIC_NAME}], + 'nextPageToken': TOKEN2} + conn = _Connection(returned) + response = self._callFUT(PROJECT, conn, SIZE, TOKEN1) + topics = response['topics'] + self.assertEqual(len(topics), 1) + self.assertEqual(topics[0], {'name': TOPIC_NAME}) + self.assertEqual(response['nextPageToken'], TOKEN2) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) + self.assertEqual(req['query_params'], + {'pageSize': SIZE, 'pageToken': TOKEN1}) + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + self._requested.append(kw) + response, self._responses = self._responses[0], self._responses[1:] + return response From bc2b7bd84be93bc94a6b80aabe63412aab3a3124 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 14:55:16 -0400 Subject: [PATCH 05/10] Py3k compatibility for base64. --- gcloud/pubsub/test_topic.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index aafabe6adb93..75c59e0d7478 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -92,15 +92,15 @@ def test_publish_single_wo_attrs(self): import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' - TEXT = 'This is the message text' - B64 = base64.b64encode(TEXT) + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD) MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - msgid = topic.publish(TEXT) + msgid = topic.publish(PAYLOAD) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -112,15 +112,15 @@ def test_publish_single_w_attrs(self): import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' - TEXT = 'This is the message text' - B64 = base64.b64encode(TEXT) + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD) MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - msgid = topic.publish(TEXT, attr1='value1', attr2='value2') + msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -132,10 +132,10 @@ def test_publish_multiple(self): import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' - TEXT1 = 'This is the first message text' - TEXT2 = 'This is the second message text' - B64_1 = base64.b64encode(TEXT1) - B64_2 = base64.b64encode(TEXT2) + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) MSGID1 = 'DEADBEEF' MSGID2 = 'BEADCAFE' MESSAGE1 = {'data': B64_1, @@ -146,8 +146,8 @@ def test_publish_multiple(self): conn = _Connection({'messageIds': [MSGID1, MSGID2]}) topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) with topic.batch() as batch: - batch.publish(TEXT1) - batch.publish(TEXT2, attr1='value1', attr2='value2') + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -161,16 +161,16 @@ class Bugout(Exception): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' - TEXT1 = 'This is the first message text' - TEXT2 = 'This is the second message text' + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' MSGID1 = 'DEADBEEF' MSGID2 = 'BEADCAFE' conn = _Connection({'messageIds': [MSGID1, MSGID2]}) topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) try: with topic.batch() as batch: - batch.publish(TEXT1) - batch.publish(TEXT2, attr1='value1', attr2='value2') + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') raise Bugout() except Bugout: pass From 4470178025b2c70e83eb75bb8b7cda4d9340f71a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 14:56:30 -0400 Subject: [PATCH 06/10] Add copyright, initial docstring. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/742#discussion_r26693931 --- gcloud/pubsub/__init__.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/gcloud/pubsub/__init__.py b/gcloud/pubsub/__init__.py index 5bb534f795ae..70df328f0987 100644 --- a/gcloud/pubsub/__init__.py +++ b/gcloud/pubsub/__init__.py @@ -1 +1,15 @@ -# package +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GCloud Pubsub API wrapper.""" From cf8b9fa2af3ceb01f08aadbb0da18fd0594bb4fb Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 14:59:18 -0400 Subject: [PATCH 07/10] De-emphasize 'project' and 'connection' as args to 'list_topics'. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/742#discussion_r26694050. --- gcloud/pubsub/api.py | 20 ++++++++++---------- gcloud/pubsub/test_api.py | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/gcloud/pubsub/api.py b/gcloud/pubsub/api.py index 4ec1d398ac5e..cfd45030ce8e 100644 --- a/gcloud/pubsub/api.py +++ b/gcloud/pubsub/api.py @@ -15,10 +15,18 @@ """ Define API functions (not bound to classes).""" -def list_topics(project=None, connection=None, - page_size=None, page_token=None): +def list_topics(page_size=None, page_token=None, + project=None, connection=None): """List topics for a given project. + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of topics. + :type project: string :param project: project ID to query. If not passed, defaults to the project ID inferred from the environment. @@ -28,14 +36,6 @@ def list_topics(project=None, connection=None, defaults to the connection inferred from the environment. - :type page_size: int - :param page_size: maximum number of topics to return, If not passed, - defaults to a value set by the API. - - :type page_token: string - :param page_token: opaque marker for the next "page" of topics. If not - passed, the API will return the first page of topics. - :rtype: dict :returns: keys include ``topics`` (a list of topic mappings) and ``nextPageToken`` (a string: if non-empty, indicates that diff --git a/gcloud/pubsub/test_api.py b/gcloud/pubsub/test_api.py index c7e1a8d24436..584e5f747b2c 100644 --- a/gcloud/pubsub/test_api.py +++ b/gcloud/pubsub/test_api.py @@ -28,7 +28,7 @@ def test_w_explicit_connection_no_paging(self): returned = {'topics': [{'name': TOPIC_NAME}], 'nextPageToken': TOKEN} conn = _Connection(returned) - response = self._callFUT(PROJECT, conn) + response = self._callFUT(project=PROJECT, connection=conn) topics = response['topics'] self.assertEqual(len(topics), 1) self.assertEqual(topics[0], {'name': TOPIC_NAME}) @@ -48,7 +48,7 @@ def test_w_explicit_connection_w_paging(self): returned = {'topics': [{'name': TOPIC_NAME}], 'nextPageToken': TOKEN2} conn = _Connection(returned) - response = self._callFUT(PROJECT, conn, SIZE, TOKEN1) + response = self._callFUT(SIZE, TOKEN1, PROJECT, conn) topics = response['topics'] self.assertEqual(len(topics), 1) self.assertEqual(topics[0], {'name': TOPIC_NAME}) From 301ea739a2bba4d0ae8f4b63d8e997968cc8c364 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 15:50:04 -0400 Subject: [PATCH 08/10] Note that the payload for a message is intened to be bytes. [ci skip] --- gcloud/pubsub/topic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 06ea946b857a..4f897d055a6a 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -76,7 +76,7 @@ def publish(self, message, **attrs): See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/publish - :type message: string + :type message: bytes :param message: the message payload :type attrs: dict (string -> string) @@ -131,7 +131,7 @@ def __iter__(self): def publish(self, message, **attrs): """Emulate publishing a message, but save it. - :type message: string + :type message: bytes :param message: the message payload :type attrs: dict (string -> string) From 9cd9cf6c2c78f1cab89f53de2cc53d2d88921fe8 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 16:40:39 -0400 Subject: [PATCH 09/10] Delete messages from batch after successful commit API return. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/742/files#r26701080 --- gcloud/pubsub/test_topic.py | 1 + gcloud/pubsub/topic.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index 75c59e0d7478..30a745ecda2c 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -149,6 +149,7 @@ def test_publish_multiple(self): batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 4f897d055a6a..1d3b5eda246a 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -145,5 +145,6 @@ def commit(self): conn = self.topic.connection response = conn.api_request(method='POST', path='%s:publish' % self.topic.path, - data={'messages': self.messages}) + data={'messages': self.messages[:]}) self.message_ids.extend(response['messageIds']) + del self.messages[:] From 703d56b2a7dad06825072c4ef5437a95d0203b01 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 18 Mar 2015 19:02:24 -0400 Subject: [PATCH 10/10] Don't bend over backward to avoid passing empty . Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/742#discussion_r26701221 --- gcloud/pubsub/api.py | 5 +---- gcloud/pubsub/test_api.py | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/gcloud/pubsub/api.py b/gcloud/pubsub/api.py index cfd45030ce8e..f331652ddbcd 100644 --- a/gcloud/pubsub/api.py +++ b/gcloud/pubsub/api.py @@ -51,7 +51,4 @@ def list_topics(page_size=None, page_token=None, params['pageToken'] = page_token path = '/projects/%s/topics' % project - if params: - return connection.api_request(method='GET', path=path, - query_params=params) - return connection.api_request(method='GET', path=path) + return connection.api_request(method='GET', path=path, query_params=params) diff --git a/gcloud/pubsub/test_api.py b/gcloud/pubsub/test_api.py index 584e5f747b2c..d21956073db1 100644 --- a/gcloud/pubsub/test_api.py +++ b/gcloud/pubsub/test_api.py @@ -37,7 +37,7 @@ def test_w_explicit_connection_no_paging(self): req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req.get('query_params'), None) + self.assertEqual(req['query_params'], {}) def test_w_explicit_connection_w_paging(self): TOPIC_NAME = 'topic_name'