Skip to content

Commit

Permalink
Modify upload_file
Browse files Browse the repository at this point in the history
  • Loading branch information
dt3310321 committed May 2, 2018
1 parent d5a4df8 commit 4c2403b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
23 changes: 16 additions & 7 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import sys
import copy
import time
import xml.dom.minidom
import xml.etree.ElementTree
from requests import Request, Session
Expand Down Expand Up @@ -270,9 +271,10 @@ def get_object(self, Bucket, Key, **kwargs):
params = format_values(params)

url = self._conf.uri(bucket=Bucket, path=Key)
logger.info("get object, url=:{url} ,headers=:{headers}".format(
logger.info("get object, url=:{url} ,headers=:{headers}, params=:{params}".format(
url=url,
headers=headers))
headers=headers,
params=params))
rt = self.send_request(
method='GET',
url=url,
Expand Down Expand Up @@ -583,9 +585,10 @@ def upload_part(self, Bucket, Key, Body, PartNumber, UploadId, EnableMD5=False,
params = {'partNumber': PartNumber, 'uploadId': UploadId}
params = format_values(params)
url = self._conf.uri(bucket=Bucket, path=Key)
logger.info("upload part, url=:{url} ,headers=:{headers}".format(
logger.info("upload part, url=:{url} ,headers=:{headers}, params=:{params}".format(
url=url,
headers=headers))
headers=headers,
params=params))
Body = deal_with_empty_file_stream(Body)
if EnableMD5:
md5_str = get_content_md5(Body)
Expand Down Expand Up @@ -1874,11 +1877,13 @@ def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num,
UploadId=uploadid,
PartNumberMarker=part_number_marker
)
parts_info.extend(response['Part'])
# 已经存在的分块上传,有可能一个分块都没有上传,判断一下
if 'Part' in response:
parts_info.extend(response['Part'])
if response['IsTruncated'] == 'false':
list_over_status = True
else:
part_number_marker = int(response['NextMarker'])
part_number_marker = int(response['NextPartNumberMarker'])
for part in parts_info:
part_num = int(part['PartNumber'])
# 如果分块数量大于本地计算出的最大数量,校验失败
Expand Down Expand Up @@ -1934,6 +1939,8 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, **kwa

if last_size != 0:
parts_num += 1
else: # 如果刚好整除,最后一块的大小等于分块大小
last_size = part_size
if parts_num > 10000:
parts_num = 10000
part_size = file_size // parts_num
Expand All @@ -1946,12 +1953,14 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, **kwa
already_exist_parts = {}
uploadid = self._get_resumable_uploadid(Bucket, Key)
if uploadid is not None:
logger.info("fetch an existed uploadid in remote cos, uploadid={uploadid}".format(uploadid=uploadid))
# 校验服务端返回的每个块的信息是否和本地的每个块的信息相同,只有校验通过的情况下才可以进行断点续传
resumable_flag = self._check_all_upload_parts(Bucket, Key, uploadid, LocalFilePath, parts_num, part_size, last_size, already_exist_parts)
# 如果不能断点续传,则创建一个新的分块上传
if not resumable_flag:
rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs)
uploadid = rt['UploadId']
logger.info("create a new uploadid in upload_file, uploadid={uploadid}".format(uploadid=uploadid))

# 上传分块
offset = 0 # 记录文件偏移量
Expand All @@ -1968,7 +1977,7 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, **kwa
pool.wait_completion()
result = pool.get_result()
if not result['success_all'] or len(lst) != parts_num:
raise CosClientError('some upload_part fail after max_retry')
raise CosClientError('some upload_part fail after max_retry, please upload_file again')
lst = sorted(lst, key=lambda x: x['PartNumber']) # 按PartNumber升序排列

# 完成分块上传
Expand Down
2 changes: 1 addition & 1 deletion qcloud_cos/cos_threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from threading import Thread
from logging import getLogger
from six.moves import queue
from six.moves.queue import Queue
from threading import Lock
import gc
logger = getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion qcloud_cos/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224

logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)

# 设置用户属性, 包括secret_id, secret_key, region
# appid已在配置中移除,请在参数Bucket中带上appid。Bucket由bucketname-appid组成
Expand Down
49 changes: 40 additions & 9 deletions ut/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

SECRET_ID = os.environ["SECRET_ID"]
SECRET_KEY = os.environ["SECRET_KEY"]
test_bucket = "test01-1252448703"
TRAVIS_FLAG = os.environ["TRAVIS_FLAG"]
test_bucket = 'cos-python-v5-test-' + str(sys.version_info[0]) + '-' + str(sys.version_info[1]) + '-' + '1252448703'
test_object = "test.txt"
special_file_name = "中文" + "→↓←→↖↗↙↘! \"#$%&'()*+,-./0123456789:;<=>@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~"
conf = CosConfig(
Expand All @@ -23,6 +24,19 @@
client = CosS3Client(conf)


def _create_test_bucket(test_bucket):
try:
response = client.create_bucket(
Bucket=test_bucket,
)
except Exception as e:
if e.get_error_code() == 'BucketAlreadyOwnedByYou':
print('BucketAlreadyOwnedByYou')
else:
raise e
return None


def get_raw_md5(data):
m2 = hashlib.md5(data)
etag = '"' + str(m2.hexdigest()) + '"'
Expand All @@ -49,7 +63,8 @@ def print_error_msg(e):

def setUp():
print ("start test...")
print (sys.version_info)
print ("start create bucket " + test_bucket)
_create_test_bucket(test_bucket)


def tearDown():
Expand Down Expand Up @@ -147,6 +162,11 @@ def test_put_object_non_exist_bucket():

def test_put_object_acl():
"""设置object acl"""
response = client.put_object(
Bucket=test_bucket,
Key=test_object,
Body='test acl'
)
response = client.put_object_acl(
Bucket=test_bucket,
Key=test_object,
Expand All @@ -161,6 +181,10 @@ def test_get_object_acl():
Key=test_object
)
assert response
response = client.delete_object(
Bucket=test_bucket,
Key=test_object
)


def test_copy_object_diff_bucket():
Expand Down Expand Up @@ -353,7 +377,7 @@ def test_get_bucket_acl_normal():
def test_list_objects():
"""列出bucket下的objects"""
response = client.list_objects(
Bucket=test_bucket,
Bucket='test01-1252448703',
MaxKeys=100,
Prefix='中文',
Delimiter='/'
Expand Down Expand Up @@ -497,7 +521,7 @@ def test_put_get_delete_replication():
{
'ID': '123',
'Status': 'Enabled',
'Prefix': '/中文',
'Prefix': '中文',
'Destination': {
'Bucket': 'qcs:id/0:cos:cn-south:appid/1252448703:replicationsouth'
}
Expand All @@ -517,7 +541,7 @@ def test_put_get_delete_replication():
Bucket=test_bucket
)
assert response
# delete lifecycle
# delete replication
response = client.delete_bucket_replication(
Bucket=test_bucket
)
Expand Down Expand Up @@ -565,7 +589,10 @@ def test_list_multipart_uploads():
def test_upload_file_multithreading():
"""根据文件大小自动选择分块大小,多线程并发上传提高上传速度"""
file_name = "thread_1GB"
gen_file(file_name, 5) # set 5MB beacuse travis too slow
file_size = 1024
if TRAVIS_FLAG == 'true':
file_size = 5 # set 5MB beacuse travis too slow
gen_file(file_name, file_size)
st = time.time() # 记录开始时间
response = client.upload_file(
Bucket=test_bucket,
Expand Down Expand Up @@ -626,7 +653,8 @@ def test_use_get_auth():
Key='test.txt',
Params={'acl': '', 'unsed': '123'}
)
response = requests.get('http://test01-1252448703.cos.ap-beijing-1.myqcloud.com/test.txt?acl&unsed=123', headers={'Authorization': auth})
url = 'http://' + test_bucket + '.cos.ap-beijing-1.myqcloud.com/test.txt?acl&unsed=123'
response = requests.get(url, headers={'Authorization': auth})
assert response.status_code == 200


Expand Down Expand Up @@ -671,9 +699,8 @@ def test_put_get_bucket_logging():

def test_put_object_enable_md5():
"""上传文件,SDK计算content-md5头部"""
file_size = 10
file_name = 'test_object_sdk_caculate_md5.file'
gen_file(file_name, 10)
gen_file(file_name, 1)
with open(file_name, 'rb') as f:
etag = get_raw_md5(f.read())
with open(file_name, 'rb') as fp:
Expand Down Expand Up @@ -704,6 +731,10 @@ def test_put_object_from_local_file():
Key=file_name
)
assert put_response['ETag'] == etag
response = client.delete_object(
Bucket=test_bucket,
Key=file_name
)
if os.path.exists(file_name):
os.remove(file_name)

Expand Down

0 comments on commit 4c2403b

Please sign in to comment.