Skip to content

Commit

Permalink
Merge pull request #743 from tseaver/691-flesh_out_pubsub_subscription
Browse files Browse the repository at this point in the history
Flesh out pubsub subscriptions
  • Loading branch information
tseaver committed Mar 19, 2015
2 parents 01d697e + 7821d88 commit 85dc64b
Show file tree
Hide file tree
Showing 4 changed files with 620 additions and 0 deletions.
55 changes: 55 additions & 0 deletions gcloud/pubsub/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def list_topics(page_size=None, page_token=None,
project=None, connection=None):
"""List topics for a given project.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
Expand Down Expand Up @@ -52,3 +55,55 @@ def list_topics(page_size=None, page_token=None,

path = '/projects/%s/topics' % project
return connection.api_request(method='GET', path=path, query_params=params)


def list_subscriptions(page_size=None, page_token=None, topic_name=None,
project=None, connection=None):
"""List subscriptions for a given project.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list
and (where ``topic_name`` is passed):
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/subscriptions/list
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: string
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of topics.
:type topic_name: string
:param topic_name: limit results to subscriptions bound to the given topic.
:type project: string
:param project: project ID to query. If not passed, defaults to the
project ID inferred from the environment.
:type connection: :class:`gcloud.pubsub.connection.Connection`
:param connection: connection to use for the query. If not passed,
defaults to the connection inferred from the
environment.
:rtype: dict
:returns: keys include ``subscriptions`` (a list of subscription mappings)
and ``nextPageToken`` (a string: if non-empty, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
params = {}

if page_size is not None:
params['pageSize'] = page_size

if page_token is not None:
params['pageToken'] = page_token

if topic_name is None:
path = '/projects/%s/subscriptions' % project
else:
path = '/projects/%s/topics/%s/subscriptions' % (project, topic_name)

return connection.api_request(method='GET', path=path, query_params=params)
186 changes: 186 additions & 0 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Define API Subscriptions."""

from gcloud.exceptions import NotFound


class Subscription(object):
"""Subscriptions receive messages published to their topics.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions
:type name: string
:param name: the name of the subscription
:type topic: :class:`gcloud.pubsub.topic.Topic`
:param topic: the topic to which the subscription belongs..
:type ack_deadline: int
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.
:type push_endpoint: string
:param push_endpoint: URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.
"""
def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
self.name = name
self.topic = topic
self.ack_deadline = ack_deadline
self.push_endpoint = push_endpoint

@property
def path(self):
"""URL path for the subscription's APIs"""
project = self.topic.project
return '/projects/%s/subscriptions/%s' % (project, self.name)

def create(self):
"""API call: create the subscription via a PUT request
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create
"""
data = {'topic': self.topic.path}

if self.ack_deadline is not None:
data['ackDeadline'] = self.ack_deadline

if self.push_endpoint is not None:
data['pushConfig'] = {'pushEndpoint': self.push_endpoint}

conn = self.topic.connection
conn.api_request(method='PUT', path=self.path, data=data)

def exists(self):
"""API call: test existence of the subscription via a GET request
See
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
"""
conn = self.topic.connection
try:
conn.api_request(method='GET',
path=self.path,
query_params={'fields': 'name'})
except NotFound:
return False
else:
return True

def reload(self):
"""API call: sync local subscription configuration via a GET request
See
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
"""
conn = self.topic.connection
data = conn.api_request(method='GET', path=self.path)
self.ack_deadline = data.get('ackDeadline')
push_config = data.get('pushConfig', {})
self.push_endpoint = push_config.get('pushEndpoint')

def modify_push_configuration(self, push_endpoint):
"""API call: update the push endpoint for the subscription.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/modifyPushConfig
:type push_endpoint: string
:param push_endpoint: URL to which messages will be pushed by the
back-end. If None, the application must pull
messages.
"""
data = {}
config = data['pushConfig'] = {}
if push_endpoint is not None:
config['pushEndpoint'] = push_endpoint
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:modifyPushConfig' % self.path,
data=data)
self.push_endpoint = push_endpoint

def pull(self, return_immediately=False, max_messages=1):
"""API call: retrieve messages for the subscription.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/pull
:type return_immediately: boolean
:param return_immediately: if True, the back-end returns even if no
messages are available; if False, the API
call blocks until one or more messages are
available.
:type max_messages: int
:param max_messages: the maximum number of messages to return.
:rtype: list of dict
:returns: sequence of mappings, each containing keys ``ackId`` (the
ID to be used in a subsequent call to :meth:`acknowledge`)
and ``message``.
"""
data = {'returnImmediately': return_immediately,
'maxMessages': max_messages}
conn = self.topic.connection
response = conn.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
return response['receivedMessages']

def acknowledge(self, ack_ids):
"""API call: acknowledge retrieved messages for the subscription.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
"""
data = {'ackIds': ack_ids}
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:acknowledge' % self.path,
data=data)

def modify_ack_deadline(self, ack_id, ack_deadline):
"""API call: update acknowledgement deadline for a retrieved message.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
:type ack_id: string
:param ack_id: ack ID of message being updated
:type ack_deadline: int
:param ack_deadline: new deadline for the message, in seconds
"""
data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:modifyAckDeadline' % self.path,
data=data)

def delete(self):
"""API call: delete the subscription via a DELETE request.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete
"""
conn = self.topic.connection
conn.api_request(method='DELETE', path=self.path)
80 changes: 80 additions & 0 deletions gcloud/pubsub/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,86 @@ def test_w_explicit_connection_w_paging(self):
{'pageSize': SIZE, 'pageToken': TOKEN1})


class Test_list_subscriptions(unittest2.TestCase):

def _callFUT(self, *args, **kw):
from gcloud.pubsub.api import list_subscriptions
return list_subscriptions(*args, **kw)

def test_w_explicit_connection_no_paging(self):
PROJECT = 'PROJECT'
SUB_NAME = 'topic_name'
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
TOPIC_NAME = 'topic_name'
TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
TOKEN = 'TOKEN'
returned = {'subscriptions': [{'name': SUB_PATH, 'topic': TOPIC_PATH}],
'nextPageToken': TOKEN}
conn = _Connection(returned)
response = self._callFUT(project=PROJECT, connection=conn)
subscriptions = response['subscriptions']
self.assertEqual(len(subscriptions), 1)
self.assertEqual(subscriptions[0],
{'name': SUB_PATH, 'topic': TOPIC_PATH})
self.assertEqual(response['nextPageToken'], TOKEN)
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'GET')
self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT)
self.assertEqual(req['query_params'], {})

def test_w_explicit_connection_w_paging(self):
PROJECT = 'PROJECT'
SUB_NAME = 'topic_name'
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
TOPIC_NAME = 'topic_name'
TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
TOKEN1 = 'TOKEN1'
TOKEN2 = 'TOKEN2'
SIZE = 1
returned = {'subscriptions': [{'name': SUB_PATH, 'topic': TOPIC_PATH}],
'nextPageToken': TOKEN2}
conn = _Connection(returned)
response = self._callFUT(SIZE, TOKEN1,
project=PROJECT, connection=conn)
subscriptions = response['subscriptions']
self.assertEqual(len(subscriptions), 1)
self.assertEqual(subscriptions[0],
{'name': SUB_PATH, 'topic': TOPIC_PATH})
self.assertEqual(response['nextPageToken'], TOKEN2)
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'GET')
self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT)
self.assertEqual(req['query_params'],
{'pageSize': SIZE, 'pageToken': TOKEN1})

def test_w_topic_name(self):
PROJECT = 'PROJECT'
SUB_NAME = 'topic_name'
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
TOPIC_NAME = 'topic_name'
TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
TOKEN = 'TOKEN'
returned = {'subscriptions': [{'name': SUB_PATH, 'topic': TOPIC_PATH}],
'nextPageToken': TOKEN}
conn = _Connection(returned)
response = self._callFUT(topic_name=TOPIC_NAME,
project=PROJECT, connection=conn)
subscriptions = response['subscriptions']
self.assertEqual(len(subscriptions), 1)
self.assertEqual(subscriptions[0],
{'name': SUB_PATH, 'topic': TOPIC_PATH})
self.assertEqual(response['nextPageToken'], TOKEN)
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'GET')
self.assertEqual(req['path'],
'/projects/%s/topics/%s/subscriptions'
% (PROJECT, TOPIC_NAME))
self.assertEqual(req['query_params'], {})


class _Connection(object):

def __init__(self, *responses):
Expand Down
Loading

0 comments on commit 85dc64b

Please sign in to comment.