Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: safely resume interrupted downloads #294

Merged
merged 16 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/resumable_media/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def __init__(
media_url, stream=stream, start=start, end=end, headers=headers
)
self.checksum = checksum
self._bytes_downloaded = 0
self._expected_checksum = None
self._checksum_object = None
self._object_generation = None

def _prepare_request(self):
"""Prepare the contents of an HTTP request.
Expand Down
83 changes: 83 additions & 0 deletions google/resumable_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
import random
import warnings

from urllib.parse import parse_qs
from urllib.parse import parse_qsl
from urllib.parse import urlencode
from urllib.parse import urlsplit
from urllib.parse import urlunsplit

from google.resumable_media import common


Expand All @@ -33,6 +39,8 @@
"implementation. Python 3 has a faster implementation, `google-crc32c`, "
"which will be used if it is installed."
)
_CONTENT_ENCODING_HEADER = "Content-Encoding"
_GENERATION_HEADER = "x-goog-generation"
_HASH_HEADER = "x-goog-hash"
_MISSING_CHECKSUM = """\
No {checksum_type} checksum was returned from the service while downloading {}
Expand Down Expand Up @@ -302,6 +310,81 @@ def _get_checksum_object(checksum_type):
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")


def _parse_generation_header(response, get_headers):
"""Parses the generation header from an ``X-Goog-Generation`` value.

Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.

Returns:
Optional[long]: The object generation from the response, if it
can be detected from the ``X-Goog-Generation`` header; otherwise, None.
"""
headers = get_headers(response)
object_generation = headers.get(_GENERATION_HEADER, None)

if object_generation is None:
return None
else:
return int(object_generation)


def _get_generation_from_url(media_url):
"""Retrieve the object generation query param specified in the media url.

Args:
media_url (str): The URL containing the media to be downloaded.

Returns:
long: The object generation from the media url if exists; otherwise, None.
"""

_, _, _, query, _ = urlsplit(media_url)
query_params = parse_qs(query)
object_generation = query_params.get("generation", None)

if object_generation is None:
return None
else:
return int(object_generation[0])


def add_query_parameters(media_url, name_value_pairs):
"""Add query parameters to a base url.

Args:
media_url (str): The URL containing the media to be downloaded.
name_value_pairs (list[tuple[str, str]]): Names and values of the query parameters to add.
cojenco marked this conversation as resolved.
Show resolved Hide resolved

Returns:
str: URL with additional query strings appended.
"""

if len(name_value_pairs) == 0:
return media_url

scheme, netloc, path, query, frag = urlsplit(media_url)
query = parse_qsl(query)
query.extend(name_value_pairs)
return urlunsplit((scheme, netloc, path, urlencode(query), frag))


def _is_decompressive_transcoding(response, get_headers):
"""Returns True if the object was served decompressed and the "Content-Encoding" header is "gzip".
See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip

Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.

Returns:
bool: Returns True if the "Content-Encoding" header is "gzip"; otherwise, False.
"""
headers = get_headers(response)
return headers.get(_CONTENT_ENCODING_HEADER) == "gzip"
cojenco marked this conversation as resolved.
Show resolved Hide resolved


class _DoNothingHash(object):
"""Do-nothing hash object.

Expand Down
138 changes: 114 additions & 24 deletions google/resumable_media/requests/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,22 @@ def _write_to_stream(self, response):
checksum doesn't agree with server-computed checksum.
"""

# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
# Retrieve the expected checksum only once for the download request,
# then compute and validate the checksum when the full download completes.
# Retried requests are range requests, and there's no way to detect
# data corruption for that byte range alone.
if self._expected_checksum is None and self._checksum_object is None:
# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What causes this case to happen? Transcoding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to retried requests being range requests. For range requests, as noted here, there's no way to detect data corruption for that byte range alone.

Therefore, here we retrieve the expected checksum/checksum object only once for the initial download request. Then we calculate and validate the checksum when the download completes.

# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
self._expected_checksum = expected_checksum
self._checksum_object = checksum_object
else:
expected_checksum = self._expected_checksum
checksum_object = self._checksum_object

with response:
# NOTE: In order to handle compressed streams gracefully, we try
Expand All @@ -104,6 +114,7 @@ def _write_to_stream(self, response):
)
for chunk in body_iter:
self._stream.write(chunk)
self._bytes_downloaded += len(chunk)
local_checksum_object.update(chunk)

if expected_checksum is not None:
Expand Down Expand Up @@ -150,7 +161,7 @@ def consume(
ValueError: If the current :class:`Download` has already
finished.
"""
method, url, payload, headers = self._prepare_request()
method, _, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
request_kwargs = {
"data": payload,
Expand All @@ -160,13 +171,48 @@ def consume(
if self._stream is not None:
request_kwargs["stream"] = True

# Assign object generation if generation is specified in the media url.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this happen via a user specifying a generation on the object? Were we not respecting this previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this would happen via a user specifying a generation on the object. Previously, we've been respecting that only through download.media_url

A property download._object_generation is added. It records the object generation from either (1) generation query param from the media_url, or (2) the object generation from the initial response header. This specific line of code does (1) and retrieves it from the media_url

P.S. It's tricky in how limited information is passed from python-storage to resumable-media-python. A resumable-media-python download instance only knows the specified object generation from its media_url, and the "object" itself isn't pertained in a download.

if self._object_generation is None:
self._object_generation = _helpers._get_generation_from_url(self.media_url)

# Wrap the request business logic in a function to be retried.
def retriable_request():
url = self.media_url

# To restart an interrupted download, read from the offset of last byte
# received using a range request, and set object generation query param.
if self._bytes_downloaded > 0:
_download.add_bytes_range(
self._bytes_downloaded, self.end, self._headers
)
request_kwargs["headers"] = self._headers

# Set object generation query param to ensure the same object content is requested.
if (
self._object_generation is not None
and _helpers._get_generation_from_url(self.media_url) is None
):
query_param = [("generation", self._object_generation)]
url = _helpers.add_query_parameters(self.media_url, query_param)

result = transport.request(method, url, **request_kwargs)

# If a generation hasn't been specified, and this is the first response we get, let's record the
# generation. In future requests we'll specify the generation query param to avoid data races.
if self._object_generation is None:
self._object_generation = _helpers._parse_generation_header(
result, self._get_headers
)

self._process_response(result)

# With decompressive transcoding, GCS serves back the whole file regardless of the range request,
# thus we reset the stream position to the start of the stream.
# See: https://cloud.google.com/storage/docs/transcoding#range
if self._stream is not None:
if _helpers._is_decompressive_transcoding(result, self._get_headers):
self._stream.seek(0)
cojenco marked this conversation as resolved.
Show resolved Hide resolved
self._bytes_downloaded = 0
self._write_to_stream(result)

return result
Expand Down Expand Up @@ -223,20 +269,30 @@ def _write_to_stream(self, response):
~google.resumable_media.common.DataCorruption: If the download's
checksum doesn't agree with server-computed checksum.
"""

# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
# Retrieve the expected checksum only once for the download request,
# then compute and validate the checksum when the full download completes.
# Retried requests are range requests, and there's no way to detect
# data corruption for that byte range alone.
if self._expected_checksum is None and self._checksum_object is None:
# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
self._expected_checksum = expected_checksum
self._checksum_object = checksum_object
else:
expected_checksum = self._expected_checksum
checksum_object = self._checksum_object

with response:
body_iter = response.raw.stream(
_request_helpers._SINGLE_GET_CHUNK_SIZE, decode_content=False
)
for chunk in body_iter:
self._stream.write(chunk)
self._bytes_downloaded += len(chunk)
checksum_object.update(chunk)
response._content_consumed = True

Expand Down Expand Up @@ -285,23 +341,57 @@ def consume(
ValueError: If the current :class:`Download` has already
finished.
"""
method, url, payload, headers = self._prepare_request()
method, _, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
request_kwargs = {
"data": payload,
"headers": headers,
"timeout": timeout,
"stream": True,
}

# Assign object generation if generation is specified in the media url.
if self._object_generation is None:
self._object_generation = _helpers._get_generation_from_url(self.media_url)

# Wrap the request business logic in a function to be retried.
def retriable_request():
# NOTE: We assume "payload is None" but pass it along anyway.
result = transport.request(
method,
url,
data=payload,
headers=headers,
stream=True,
timeout=timeout,
)
url = self.media_url

# To restart an interrupted download, read from the offset of last byte
# received using a range request, and set object generation query param.
if self._bytes_downloaded > 0:
_download.add_bytes_range(
self._bytes_downloaded, self.end, self._headers
)
request_kwargs["headers"] = self._headers

# Set object generation query param to ensure the same object content is requested.
if (
self._object_generation is not None
and _helpers._get_generation_from_url(self.media_url) is None
):
query_param = [("generation", self._object_generation)]
url = _helpers.add_query_parameters(self.media_url, query_param)

result = transport.request(method, url, **request_kwargs)

# If a generation hasn't been specified, and this is the first response we get, let's record the
# generation. In future requests we'll specify the generation query param to avoid data races.
if self._object_generation is None:
self._object_generation = _helpers._parse_generation_header(
result, self._get_headers
)

self._process_response(result)

# With decompressive transcoding, GCS serves back the whole file regardless of the range request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should be highlighted as a shortcoming in the decompressive transcoding docs-- not being able to resume a download may be costly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mentioned in the very bottom section of the decompressive transcoding docs. I agree we can add notes on how retries may be impacted in this sense.

# thus we reset the stream position to the start of the stream.
# See: https://cloud.google.com/storage/docs/transcoding#range
if self._stream is not None:
if _helpers._is_decompressive_transcoding(result, self._get_headers):
self._stream.seek(0)
self._bytes_downloaded = 0
self._write_to_stream(result)

return result
Expand Down
Loading