-
Notifications
You must be signed in to change notification settings - Fork 332
/
gcs_json_media.py
722 lines (635 loc) · 28.5 KB
/
gcs_json_media.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
# -*- coding: utf-8 -*-
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Media helper functions and classes for Google Cloud Storage JSON API."""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
import copy
import logging
import re
import socket
import types
import six
from six.moves import http_client
from six.moves import urllib
from six.moves import cStringIO
from apitools.base.py import exceptions as apitools_exceptions
from gslib.cloud_api import BadRequestException
from gslib.lazy_wrapper import LazyWrapper
from gslib.progress_callback import ProgressCallbackWithTimeout
from gslib.utils.constants import DEBUGLEVEL_DUMP_REQUESTS
from gslib.utils.constants import SSL_TIMEOUT_SEC
from gslib.utils.constants import TRANSFER_BUFFER_SIZE
from gslib.utils.constants import UTF8
from gslib.utils import text_util
import httplib2
from httplib2 import parse_uri
if six.PY3:
long = int
# A regex for matching any series of decimal digits.
DECIMAL_REGEX = LazyWrapper(lambda: (re.compile(r'\d+')))
class BytesTransferredContainer(object):
"""Container class for passing number of bytes transferred to lower layers.
For resumed transfers or connection rebuilds in the middle of a transfer, we
need to rebuild the connection class with how much we've transferred so far.
For uploads, we don't know the total number of bytes uploaded until we've
queried the server, but we need to create the connection class to pass to
httplib2 before we can query the server. This container object allows us to
pass a reference into Upload/DownloadCallbackConnection.
"""
def __init__(self):
self.__bytes_transferred = 0
@property
def bytes_transferred(self):
return self.__bytes_transferred
@bytes_transferred.setter
def bytes_transferred(self, value):
self.__bytes_transferred = value
class UploadCallbackConnectionClassFactory(object):
"""Creates a class that can override an httplib2 connection.
This is used to provide progress callbacks and disable dumping the upload
payload during debug statements. It can later be used to provide on-the-fly
hash digestion during upload.
"""
def __init__(self,
bytes_uploaded_container,
buffer_size=TRANSFER_BUFFER_SIZE,
total_size=0,
progress_callback=None,
logger=None,
debug=0):
self.bytes_uploaded_container = bytes_uploaded_container
self.buffer_size = buffer_size
self.total_size = total_size
self.progress_callback = progress_callback
self.logger = logger
self.debug = debug
def GetConnectionClass(self):
"""Returns a connection class that overrides send."""
outer_bytes_uploaded_container = self.bytes_uploaded_container
outer_buffer_size = self.buffer_size
outer_total_size = self.total_size
outer_progress_callback = self.progress_callback
outer_logger = self.logger
outer_debug = self.debug
class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
"""Connection class override for uploads."""
bytes_uploaded_container = outer_bytes_uploaded_container
# After we instantiate this class, apitools will check with the server
# to find out how many bytes remain for a resumable upload. This allows
# us to update our progress once based on that number.
processed_initial_bytes = False
GCS_JSON_BUFFER_SIZE = outer_buffer_size
callback_processor = None
size = outer_total_size
header_encoding = ''
header_length = None
header_range = None
size_modifier = 1.0
def __init__(self, *args, **kwargs):
kwargs['timeout'] = SSL_TIMEOUT_SEC
httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
# Override httplib.HTTPConnection._send_output for debug logging.
# Because the distinction between headers and message body occurs
# only in this httplib function, we can only differentiate them here.
def _send_output(self, message_body=None, encode_chunked=False):
r"""Send the currently buffered request and clear the buffer.
Appends an extra \r\n to the buffer.
Args:
message_body: if specified, this is appended to the request.
"""
# TODO: Presently, apitools will set http2lib2.debuglevel to 0
# (no prints) or 4 (dump upload payload, httplib prints to stdout).
# Refactor to allow our media-handling functions to handle
# debuglevel == 4 and print messages to stderr.
self._buffer.extend((b'', b''))
if six.PY2:
items = self._buffer
else:
items = []
for item in self._buffer:
if isinstance(item, bytes):
items.append(item)
else:
items.append(item.encode(UTF8))
msg = b'\r\n'.join(items)
num_metadata_bytes = len(msg)
if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
outer_logger.debug('send: %s' % msg)
del self._buffer[:]
# If msg and message_body are sent in a single send() call,
# it will avoid performance problems caused by the interaction
# between delayed ack and the Nagle algorithm.
if isinstance(message_body, str):
msg += message_body
message_body = None
self.send(msg, num_metadata_bytes=num_metadata_bytes)
if message_body is not None:
# message_body was not a string (i.e. it is a file) and
# we must run the risk of Nagle
self.send(message_body)
def putheader(self, header, *values):
"""Overrides HTTPConnection.putheader.
Send a request header line to the server. For example:
h.putheader('Accept', 'text/html').
This override records the content encoding, length, and range of the
payload. For uploads where the content-range difference does not match
the content-length, progress printing will under-report progress. These
headers are used to calculate a multiplier to correct the progress.
For example: the content-length for gzip transport encoded data
represents the compressed size of the data while the content-range
difference represents the uncompressed size. Dividing the
content-range difference by the content-length gives the ratio to
multiply the progress by to correctly report the relative progress.
Args:
header: The header.
*values: A set of values for the header.
"""
if header == 'content-encoding':
value = ''.join([str(v) for v in values])
self.header_encoding = value
if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
outer_logger.debug(
'send: Using gzip transport encoding for the request.')
elif header == 'content-length':
try:
value = int(''.join([str(v) for v in values]))
self.header_length = value
except ValueError:
pass
elif header == 'content-range':
try:
# There are 3 valid header formats:
# '*/%d', '%d-%d/*', and '%d-%d/%d'
value = ''.join([str(v) for v in values])
ranges = DECIMAL_REGEX().findall(value)
# If there are 2 or more range values, they will always
# correspond to the start and end ranges in the header.
if len(ranges) > 1:
# Subtract the end position from the start position.
self.header_range = (int(ranges[1]) - int(ranges[0])) + 1
except ValueError:
pass
# If the content header is gzip, and a range and length are set,
# update the modifier.
if (self.header_encoding == 'gzip' and self.header_length and
self.header_range):
# Update the modifier
self.size_modifier = self.header_range / float(self.header_length)
# Reset the headers
self.header_encoding = ''
self.header_length = None
self.header_range = None
# Log debug information to catch in tests.
if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
outer_logger.debug('send: Setting progress modifier to %s.' %
(self.size_modifier))
# Propagate header values.
http_client.HTTPSConnection.putheader(self, header, *values)
def send(self, data, num_metadata_bytes=0):
"""Overrides HTTPConnection.send.
Args:
data: string or file-like object (implements read()) of data to send.
num_metadata_bytes: number of bytes that consist of metadata
(headers, etc.) not representing the data being uploaded.
"""
if not self.processed_initial_bytes:
self.processed_initial_bytes = True
if outer_progress_callback:
self.callback_processor = ProgressCallbackWithTimeout(
outer_total_size, outer_progress_callback)
self.callback_processor.Progress(
self.bytes_uploaded_container.bytes_transferred)
# httplib.HTTPConnection.send accepts either a string or a file-like
# object (anything that implements read()).
if isinstance(data, six.text_type):
full_buffer = cStringIO(data)
elif isinstance(data, six.binary_type):
full_buffer = six.BytesIO(data)
else:
full_buffer = data
partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
while partial_buffer:
if six.PY2:
httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
else:
if isinstance(partial_buffer, bytes):
httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
else:
httplib2.HTTPSConnectionWithTimeout.send(
self, partial_buffer.encode(UTF8))
sent_data_bytes = len(partial_buffer)
if num_metadata_bytes:
if num_metadata_bytes <= sent_data_bytes:
sent_data_bytes -= num_metadata_bytes
num_metadata_bytes = 0
else:
num_metadata_bytes -= sent_data_bytes
sent_data_bytes = 0
if self.callback_processor:
# Modify the sent data bytes by the size modifier. These are
# stored as floats, so the result should be floored.
sent_data_bytes = int(sent_data_bytes * self.size_modifier)
# TODO: We can't differentiate the multipart upload
# metadata in the request body from the actual upload bytes, so we
# will actually report slightly more bytes than desired to the
# callback handler. Get the number of multipart upload metadata
# bytes from apitools and subtract from sent_data_bytes.
self.callback_processor.Progress(sent_data_bytes)
partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
return UploadCallbackConnection
def WrapUploadHttpRequest(upload_http):
"""Wraps upload_http so we only use our custom connection_type on PUTs.
POSTs are used to refresh oauth tokens, and we don't want to process the
data sent in those requests.
Args:
upload_http: httplib2.Http instance to wrap
"""
request_orig = upload_http.request
def NewRequest(uri,
method='GET',
body=None,
headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS,
connection_type=None):
if method == 'PUT' or method == 'POST':
override_connection_type = connection_type
else:
override_connection_type = None
return request_orig(uri,
method=method,
body=body,
headers=headers,
redirections=redirections,
connection_type=override_connection_type)
# Replace the request method with our own closure.
upload_http.request = NewRequest
class DownloadCallbackConnectionClassFactory(object):
"""Creates a class that can override an httplib2 connection.
This is used to provide progress callbacks, disable dumping the download
payload during debug statements, and provide on-the-fly hash digestion during
download. On-the-fly digestion is particularly important because httplib2
will decompress gzipped content on-the-fly, thus this class provides our
only opportunity to calculate the correct hash for an object that has a
gzip hash in the cloud.
"""
def __init__(self,
bytes_downloaded_container,
buffer_size=TRANSFER_BUFFER_SIZE,
total_size=0,
progress_callback=None,
digesters=None):
self.buffer_size = buffer_size
self.total_size = total_size
self.progress_callback = progress_callback
self.digesters = digesters
self.bytes_downloaded_container = bytes_downloaded_container
def GetConnectionClass(self):
"""Returns a connection class that overrides getresponse."""
class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
"""Connection class override for downloads."""
outer_total_size = self.total_size
outer_digesters = self.digesters
outer_progress_callback = self.progress_callback
outer_bytes_downloaded_container = self.bytes_downloaded_container
processed_initial_bytes = False
callback_processor = None
def __init__(self, *args, **kwargs):
kwargs['timeout'] = SSL_TIMEOUT_SEC
httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
def getresponse(self, buffering=False):
"""Wraps an HTTPResponse to perform callbacks and hashing.
In this function, self is a DownloadCallbackConnection.
Args:
buffering: Unused. This function uses a local buffer.
Returns:
HTTPResponse object with wrapped read function.
"""
orig_response = http_client.HTTPConnection.getresponse(self)
if orig_response.status not in (http_client.OK,
http_client.PARTIAL_CONTENT):
return orig_response
orig_read_func = orig_response.read
def read(amt=None): # pylint: disable=invalid-name
"""Overrides HTTPConnection.getresponse.read.
This function only supports reads of TRANSFER_BUFFER_SIZE or smaller.
Args:
amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a
keyword argument to match the read function it overrides,
but it is required.
Returns:
Data read from HTTPConnection.
"""
if not amt or amt > TRANSFER_BUFFER_SIZE:
raise BadRequestException(
'Invalid HTTP read size %s during download, expected %s.' %
(amt, TRANSFER_BUFFER_SIZE))
else:
amt = amt or TRANSFER_BUFFER_SIZE
if not self.processed_initial_bytes:
self.processed_initial_bytes = True
if self.outer_progress_callback:
self.callback_processor = ProgressCallbackWithTimeout(
self.outer_total_size, self.outer_progress_callback)
self.callback_processor.Progress(
self.outer_bytes_downloaded_container.bytes_transferred)
data = orig_read_func(amt)
read_length = len(data)
if self.callback_processor:
self.callback_processor.Progress(read_length)
if self.outer_digesters:
for alg in self.outer_digesters:
self.outer_digesters[alg].update(data)
return data
orig_response.read = read
return orig_response
return DownloadCallbackConnection
def WrapDownloadHttpRequest(download_http):
"""Overrides download request functions for an httplib2.Http object.
Args:
download_http: httplib2.Http.object to wrap / override.
Returns:
Wrapped / overridden httplib2.Http object.
"""
# httplib2 has a bug (https://github.com/httplib2/httplib2/issues/75) where
# custom connection_type is not respected after redirects. This function is
# copied from httplib2 and overrides the request function so that the
# connection_type is properly passed through (everything here should be
# identical to the _request method in httplib2, with the exception of the line
# below marked by the "BUGFIX" comment).
# pylint: disable=protected-access,g-inconsistent-quotes,unused-variable
# pylint: disable=g-equals-none,g-doc-return-or-yield
# pylint: disable=g-short-docstring-punctuation,g-doc-args
# pylint: disable=too-many-statements
# yapf: disable
def OverrideRequest(self, conn, host, absolute_uri, request_uri, method,
body, headers, redirections, cachekey):
"""Do the actual request using the connection object.
Also follow one level of redirects if necessary.
"""
auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations
if auth.inscope(host, request_uri)])
auth = auths and sorted(auths)[0][1] or None
if auth:
auth.request(method, request_uri, headers, body)
(response, content) = self._conn_request(conn, request_uri, method, body,
headers)
if auth:
if auth.response(response, body):
auth.request(method, request_uri, headers, body)
(response, content) = self._conn_request(conn, request_uri, method,
body, headers)
response._stale_digest = 1
if response.status == 401:
for authorization in self._auth_from_challenge(
host, request_uri, headers, response, content):
authorization.request(method, request_uri, headers, body)
(response, content) = self._conn_request(conn, request_uri, method,
body, headers)
if response.status != 401:
self.authorizations.append(authorization)
authorization.response(response, body)
break
if (self.follow_all_redirects or (method in ["GET", "HEAD"])
or response.status == 303):
if self.follow_redirects and response.status in [300, 301, 302,
303, 307]:
# Pick out the location header and basically start from the beginning
# remembering first to strip the ETag header and decrement our 'depth'
if redirections:
if 'location' not in response and response.status != 300:
raise httplib2.RedirectMissingLocation(
"Redirected but the response is missing a Location: header.",
response, content)
# Fix-up relative redirects (which violate an RFC 2616 MUST)
if 'location' in response:
location = response['location']
(scheme, authority, path, query, fragment) = parse_uri(location)
if authority is None:
response['location'] = urllib.parse.urljoin(absolute_uri, location)
if response.status == 301 and method in ["GET", "HEAD"]:
response['-x-permanent-redirect-url'] = response['location']
if 'content-location' not in response:
response['content-location'] = absolute_uri
httplib2._updateCache(headers, response, content, self.cache,
cachekey)
if 'if-none-match' in headers:
del headers['if-none-match']
if 'if-modified-since' in headers:
del headers['if-modified-since']
if ('authorization' in headers and
not self.forward_authorization_headers):
del headers['authorization']
if 'location' in response:
location = response['location']
old_response = copy.deepcopy(response)
if 'content-location' not in old_response:
old_response['content-location'] = absolute_uri
redirect_method = method
if response.status in [302, 303]:
redirect_method = "GET"
body = None
(response, content) = self.request(
location, redirect_method, body=body, headers=headers,
redirections=redirections-1,
# BUGFIX (see comments at the top of this function):
connection_type=conn.__class__)
response.previous = old_response
else:
raise httplib2.RedirectLimit(
"Redirected more times than redirection_limit allows.",
response, content)
elif response.status in [200, 203] and method in ["GET", "HEAD"]:
# Don't cache 206's since we aren't going to handle byte range
# requests
if 'content-location' in response:
response['content-location'] = absolute_uri
httplib2._updateCache(headers, response, content, self.cache,
cachekey)
return (response, content)
# Wrap download_http so we do not use our custom connection_type
# on POSTS, which are used to refresh oauth tokens. We don't want to
# process the data received in those requests.
request_orig = download_http.request
def NewRequest(uri, method='GET', body=None, headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS,
connection_type=None):
if method == 'POST':
return request_orig(uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=None)
else:
return request_orig(uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=connection_type)
# Replace the request methods with our own closures.
download_http._request = types.MethodType(OverrideRequest, download_http)
download_http.request = NewRequest
return download_http
class HttpWithNoRetries(httplib2.Http):
"""httplib2.Http variant that does not retry.
httplib2 automatically retries requests according to httplib2.RETRIES, but
in certain cases httplib2 ignores the RETRIES value and forces a retry.
Because httplib2 does not handle the case where the underlying request body
is a stream, a retry may cause a non-idempotent write as the stream is
partially consumed and not reset before the retry occurs.
Here we override _conn_request to disable retries unequivocally, so that
uploads may be retried at higher layers that properly handle stream request
bodies.
"""
def _conn_request(self, conn, request_uri, method, body, headers): # pylint: disable=too-many-statements
try:
if hasattr(conn, 'sock') and conn.sock is None:
conn.connect()
conn.request(method, request_uri, body, headers)
except socket.timeout:
raise
except socket.gaierror:
conn.close()
raise httplib2.ServerNotFoundError('Unable to find the server at %s' %
conn.host)
except httplib2.ssl.SSLError:
conn.close()
raise
except socket.error as e:
err = 0
if hasattr(e, 'args'):
err = getattr(e, 'args')[0]
else:
err = e.errno
if err == httplib2.errno.ECONNREFUSED: # Connection refused
raise
except http_client.HTTPException:
conn.close()
raise
try:
response = conn.getresponse()
except (socket.error, http_client.HTTPException):
conn.close()
raise
else:
content = ''
if method == 'HEAD':
conn.close()
else:
content = response.read()
response = httplib2.Response(response)
if method != 'HEAD':
# pylint: disable=protected-access
content = httplib2._decompressContent(response, content)
return (response, content)
class HttpWithDownloadStream(httplib2.Http):
"""httplib2.Http variant that only pushes bytes through a stream.
httplib2 handles media by storing entire chunks of responses in memory, which
is undesirable particularly when multiple instances are used during
multi-threaded/multi-process copy. This class copies and then overrides some
httplib2 functions to use a streaming copy approach that uses small memory
buffers.
Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries
class doc).
"""
def __init__(self, *args, **kwds):
self._stream = None
self._logger = logging.getLogger()
super(HttpWithDownloadStream, self).__init__(*args, **kwds)
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
self._stream = value
# pylint: disable=too-many-statements
def _conn_request(self, conn, request_uri, method, body, headers):
try:
if hasattr(conn, 'sock') and conn.sock is None:
conn.connect()
conn.request(method, request_uri, body, headers)
except socket.timeout:
raise
except socket.gaierror:
conn.close()
raise httplib2.ServerNotFoundError('Unable to find the server at %s' %
conn.host)
except httplib2.ssl.SSLError:
conn.close()
raise
except socket.error as e:
err = 0
if hasattr(e, 'args'):
err = getattr(e, 'args')[0]
else:
err = e.errno
if err == httplib2.errno.ECONNREFUSED: # Connection refused
raise
except http_client.HTTPException:
# Just because the server closed the connection doesn't apparently mean
# that the server didn't send a response.
conn.close()
raise
try:
response = conn.getresponse()
except (socket.error, http_client.HTTPException) as e:
conn.close()
raise
else:
content = ''
if method == 'HEAD':
conn.close()
response = httplib2.Response(response)
elif method == 'GET' and response.status in (http_client.OK,
http_client.PARTIAL_CONTENT):
content_length = None
if hasattr(response, 'msg'):
content_length = response.getheader('content-length')
http_stream = response
bytes_read = 0
while True:
new_data = http_stream.read(TRANSFER_BUFFER_SIZE)
if new_data:
if self.stream is None:
raise apitools_exceptions.InvalidUserInputError(
'Cannot exercise HttpWithDownloadStream with no stream')
text_util.write_to_fd(self.stream, new_data)
bytes_read += len(new_data)
else:
break
if (content_length is not None and
long(bytes_read) != long(content_length)):
# The input stream terminated before we were able to read the
# entire contents, possibly due to a network condition. Set
# content-length to indicate how many bytes we actually read.
self._logger.log(
logging.DEBUG, 'Only got %s bytes out of content-length %s '
'for request URI %s. Resetting content-length to match '
'bytes read.', bytes_read, content_length, request_uri)
# Failing to delete existing headers before setting new values results
# in the header being set twice, see https://docs.python.org/3/library/email.compat32-message.html#email.message.Message.__setitem__.
# This trips apitools up when executing a retry, so the line below is
# essential:
del response.msg['content-length']
response.msg['content-length'] = str(bytes_read)
response = httplib2.Response(response)
else:
# We fall back to the current httplib2 behavior if we're
# not processing download bytes, e.g., it's a redirect, an
# oauth2client POST to refresh an access token, or any HTTP
# status code that doesn't include object content.
content = response.read()
response = httplib2.Response(response)
# pylint: disable=protected-access
content = httplib2._decompressContent(response, content)
return (response, content)
# pylint: enable=too-many-statements