Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming: add pep8 / pylint goodness #1210

Merged
merged 17 commits into from
Nov 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions gcloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ def download_to_file(self, file_obj, client=None):
download_url = self.media_link

# Use apitools 'Download' facility.
download = Download.from_stream(file_obj, auto_transfer=False)
headers = {}
download = Download.from_stream(file_obj)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

if self.chunk_size is not None:
download.chunksize = self.chunk_size
headers['Range'] = 'bytes=0-%d' % (self.chunk_size - 1,)
request = Request(download_url, 'GET', headers)

request = Request(download_url, 'GET')

# Use the private ``_connection`` rather than the public
# ``.connection``, since the public connection may be a batch. A
Expand All @@ -275,8 +275,6 @@ def download_to_file(self, file_obj, client=None):
# it has all three (http, API_BASE_URL and build_api_url).
download.initialize_download(request, client._connection.http)

download.stream_file(use_chunks=True)

def download_to_filename(self, filename, client=None):
"""Download the contents of this blob into a named file.

Expand Down Expand Up @@ -386,7 +384,10 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
}

upload = Upload(file_obj, content_type, total_bytes,
auto_transfer=False, chunksize=self.chunk_size)
auto_transfer=False)

if self.chunk_size is not None:
upload.chunksize = self.chunk_size

url_builder = _UrlBuilder(bucket_name=self.bucket.name,
object_name=self.name)
Expand Down
10 changes: 8 additions & 2 deletions gcloud/storage/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ def test_upload_from_file_size_failure(self):

def _upload_from_file_simple_test_helper(self, properties=None,
content_type_arg=None,
expected_content_type=None):
expected_content_type=None,
chunk_size=5):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
Expand All @@ -361,7 +362,7 @@ def _upload_from_file_simple_test_helper(self, properties=None,
bucket = _Bucket(client)
blob = self._makeOne(BLOB_NAME, bucket=bucket, properties=properties)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
blob.chunk_size = chunk_size

with _NamedTemporaryFile() as temp:
with open(temp.name, 'wb') as file_obj:
Expand Down Expand Up @@ -390,6 +391,11 @@ def test_upload_from_file_simple(self):
self._upload_from_file_simple_test_helper(
expected_content_type='application/octet-stream')

def test_upload_from_file_simple_w_chunk_size_None(self):
self._upload_from_file_simple_test_helper(
expected_content_type='application/octet-stream',
chunk_size=None)

def test_upload_from_file_simple_with_content_type(self):
EXPECTED_CONTENT_TYPE = 'foo/bar'
self._upload_from_file_simple_test_helper(
Expand Down
48 changes: 35 additions & 13 deletions gcloud/streaming/buffered_stream.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
# pylint: skip-file
"""Small helper class to provide a small slice of a stream.

This class reads ahead to detect if we are at the end of the stream.
"""

from gcloud.streaming import exceptions
from gcloud.streaming.exceptions import NotYetImplementedError


# TODO(user): Consider replacing this with a StringIO.
class BufferedStream(object):
"""Buffers a stream, reading ahead to determine if we're at the end.

"""Buffers a stream, reading ahead to determine if we're at the end."""
:type stream: readable file-like object
:param stream: the stream to be buffered

:type start: integer
:param start: the starting point in the stream

:type size: integer
:param size: the size of the buffer
"""
def __init__(self, stream, start, size):
self._stream = stream
self._start_pos = start
Expand All @@ -30,30 +36,46 @@ def __len__(self):

@property
def stream_exhausted(self):
"""Does the stream have bytes remaining beyond the buffer

:rtype: boolean
"""
return self._stream_at_end

@property
def stream_end_position(self):
"""Point to which stream was read into the buffer

:rtype: integer
"""
return self._end_pos

@property
def _bytes_remaining(self):
"""Bytes remaining to be read from the buffer

:rtype: integer
"""
return len(self._buffered_data) - self._buffer_pos

def read(self, size=None): # pylint: disable=invalid-name
"""Reads from the buffer."""
def read(self, size=None):
"""Read bytes from the buffer.

:type size: integer or None
:param size: How many bytes to read (defaults to all remaining bytes).
"""
if size is None or size < 0:
raise exceptions.NotYetImplementedError(
raise NotYetImplementedError(
'Illegal read of size %s requested on BufferedStream. '
'Wrapped stream %s is at position %s-%s, '
'%s bytes remaining.' %
(size, self._stream, self._start_pos, self._end_pos,
self._bytes_remaining))

data = b''
if self._bytes_remaining:
size = min(size, self._bytes_remaining)
data = self._buffered_data[
self._buffer_pos:self._buffer_pos + size]
self._buffer_pos += size
if not self._bytes_remaining:
return b''

size = min(size, self._bytes_remaining)
data = self._buffered_data[self._buffer_pos:self._buffer_pos + size]
self._buffer_pos += size
return data
68 changes: 42 additions & 26 deletions gcloud/streaming/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
# pylint: skip-file
"""Exceptions for generated client libraries."""


class Error(Exception):

"""Base class for all exceptions."""


class TypecheckError(Error, TypeError):

"""An object of an incorrect type is provided."""


class NotFoundError(Error):

"""A specified resource could not be found."""


class UserError(Error):

"""Base class for errors related to user input."""


class InvalidDataError(Error):

"""Base class for any invalid data error."""


class CommunicationError(Error):

"""Any communication error talking to an API server."""


class HttpError(CommunicationError):
"""Error making a request. Soon to be HttpError.

:type response: dict
:param response: headers from the response which returned the error

"""Error making a request. Soon to be HttpError."""
:type content: bytes
:param content: payload of the response which returned the error

:type url: string
:param url: URL of the response which returned the error
"""
def __init__(self, response, content, url):
super(HttpError, self).__init__()
self.response = response
Expand All @@ -49,70 +46,89 @@ def __str__(self):

@property
def status_code(self):
# TODO(craigcitro): Turn this into something better than a
# KeyError if there is no status.
"""Status code for the response.

:rtype: integer
:returns: the code
"""
return int(self.response['status'])

@classmethod
def FromResponse(cls, http_response):
def from_response(cls, http_response):
"""Factory: construct an exception from a response.

:type http_response: :class:`gcloud.streaming.http_wrapper.Response`
:param http_response: the response which returned the error

:rtype: :class:`HttpError`
"""
return cls(http_response.info, http_response.content,
http_response.request_url)


class InvalidUserInputError(InvalidDataError):

"""User-provided input is invalid."""


class ConfigurationValueError(UserError):

"""Some part of the user-specified client configuration is invalid."""


class TransferError(CommunicationError):

"""Errors related to transfers."""


class TransferRetryError(TransferError):

"""Retryable errors related to transfers."""


class TransferInvalidError(TransferError):

"""The given transfer is invalid."""


class RequestError(CommunicationError):

"""The request was not successful."""


class RetryAfterError(HttpError):
"""The response contained a retry-after header.

:type response: dict
:param response: headers from the response which returned the error

"""The response contained a retry-after header."""
:type content: bytes
:param content: payload of the response which returned the error

:type url: string
:param url: URL of the response which returned the error

:type retry_after: integer
:param retry_after: seconds to wait before retrying
"""
def __init__(self, response, content, url, retry_after):
super(RetryAfterError, self).__init__(response, content, url)
self.retry_after = int(retry_after)

@classmethod
def FromResponse(cls, http_response):
def from_response(cls, http_response):
"""Factory: construct an exception from a response.

:type http_response: :class:`gcloud.streaming.http_wrapper.Response`
:param http_response: the response which returned the error

:rtype: :class:`RetryAfterError`
"""
return cls(http_response.info, http_response.content,
http_response.request_url, http_response.retry_after)


class BadStatusCodeError(HttpError):

"""The request completed but returned a bad status code."""


class NotYetImplementedError(Error):

"""This functionality is not yet implemented."""


class StreamExhausted(Error):

"""Attempted to read more bytes from a stream than were available."""
Loading