diff --git a/qcloud_cos/__init__.py b/qcloud_cos/__init__.py index 8472f4a1..e545180f 100644 --- a/qcloud_cos/__init__.py +++ b/qcloud_cos/__init__.py @@ -3,3 +3,4 @@ from .cos_exception import CosServiceError from .cos_exception import CosClientError from .cos_auth import CosS3Auth +from .cos_comm import get_date diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index ce439a9f..b33ef236 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -11,6 +11,7 @@ import xml.dom.minidom import xml.etree.ElementTree from requests import Request, Session +from datetime import datetime from urllib import quote from hashlib import md5 from streambody import StreamBody @@ -115,7 +116,7 @@ def __init__(self, conf, retry=1, session=None): else: self._session = session - def get_auth(self, Method, Bucket, Key='', Expired=300, Headers={}, Params={}): + def get_auth(self, Method, Bucket, Key, Expired=300, Headers={}, Params={}): """获取签名 :param Method(string): http method,如'PUT','GET'. @@ -152,7 +153,7 @@ def send_request(self, method, url, timeout=30, **kwargs): timeout = self._conf._timeout if self._conf._token is not None: kwargs['headers']['x-cos-security-token'] = self._conf._token - kwargs['headers']['User-Agent'] = 'cos-python-sdk-v5.1.4.0' + kwargs['headers']['User-Agent'] = 'cos-python-sdk-v5.1.4.1' try: for j in range(self._retry): if method == 'POST': @@ -214,6 +215,7 @@ def put_object(self, Bucket, Body, Key, EnableMD5=False, **kwargs): ) print response['ETag'] """ + check_object_content_length(Body) headers = mapped(kwargs) if 'Metadata' in headers.keys(): for i in headers['Metadata'].keys(): @@ -579,6 +581,7 @@ def upload_part(self, Bucket, Key, Body, PartNumber, UploadId, EnableMD5=False, Key='test.txt' ) """ + check_object_content_length(Body) headers = mapped(kwargs) url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?partNumber={PartNumber}&uploadId={UploadId}".format( PartNumber=PartNumber, @@ -826,7 +829,6 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs): auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key, Key), headers=headers, params=params) - print rt.headers return None # s3 bucket interface begin @@ -1335,7 +1337,7 @@ def put_bucket_lifecycle(self, Bucket, LifecycleConfiguration={}, **kwargs): lifecycle_config = { 'Rule': [ { - 'Expiration': {'Days': 100}, + 'Expiration': {'Date': get_date(2018, 4, 24)}, 'ID': '123', 'Filter': {'Prefix': ''}, 'Status': 'Enabled', @@ -1727,7 +1729,7 @@ def list_buckets(self, **kwargs): return data # Advanced interface - def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst): + def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst, resumable_flag, already_exist_parts): """从本地文件中读取分块, 上传单个分块,将结果记录在md5——list中 :param bucket(string): 存储桶名称. @@ -1738,22 +1740,107 @@ def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid :param part_num(int): 上传分块的序号. :param uploadid(string): 分块上传的uploadid. :param md5_lst(list): 保存上传成功分块的MD5和序号. + :param resumable_flag(bool): 是否为断点续传. + :param already_exist_parts(dict): 断点续传情况下,保存已经上传的块的序号和Etag. :return: None. """ + # 如果是断点续传且该分块已经上传了则不用实际上传 + if resumable_flag and part_num in already_exist_parts: + md5_lst.append({'PartNumber': part_num, 'ETag': already_exist_parts[part_num]}) + else: + with open(local_path, 'rb') as fp: + fp.seek(offset, 0) + data = fp.read(size) + rt = self.upload_part(bucket, key, data, part_num, uploadid) + md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']}) + return None + + def _get_resumable_uploadid(self, bucket, key): + """从服务端获取未完成的分块上传任务,获取断点续传的uploadid + + :param bucket(string): 存储桶名称. + :param key(string): 分块上传路径名. + :return(string): 断点续传的uploadid,如果不存在则返回None. + """ + multipart_response = self.list_multipart_uploads( + Bucket=bucket, + Prefix=key + ) + if 'Upload' in multipart_response.keys(): + if multipart_response['Upload'][0]['Key'] == key: + return multipart_response['Upload'][0]['UploadId'] + + return None + + def _check_single_upload_part(self, local_path, offset, local_part_size, remote_part_size, remote_etag): + """从本地文件中读取分块, 校验本地分块和服务端的分块信息 + + :param local_path(string): 本地文件路径名. + :param offset(int): 读取本地文件的分块偏移量. + :param local_part_size(int): 读取本地文件的分块大小. + :param remote_part_size(int): 服务端的文件的分块大小. + :param remote_etag(string): 服务端的文件Etag. + :return(bool): 本地单个分块的信息是否和服务端的分块信息一致 + """ + if local_part_size != remote_part_size: + return False with open(local_path, 'rb') as fp: fp.seek(offset, 0) - data = fp.read(size) - rt = self.upload_part(bucket, key, data, part_num, uploadid) - md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']}) - return None + local_etag = get_raw_md5(fp.read(local_part_size)) + if local_etag == remote_etag: + return True + return False - def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kwargs): - """小于等于100MB的文件简单上传,大于等于100MB的文件使用分块上传 + def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num, part_size, last_size, already_exist_parts): + """获取所有已经上传的分块的信息,和本地的文件进行对比 + + :param bucket(string): 存储桶名称. + :param key(string): 分块上传路径名. + :param uploadid(string): 分块上传的uploadid + :param local_path(string): 本地文件的大小 + :param parts_num(int): 本地文件的分块数 + :param part_size(int): 本地文件的分块大小 + :param last_size(int): 本地文件的最后一块分块大小 + :param already_exist_parts(dict): 保存已经上传的分块的part_num和Etag + :return(bool): 本地文件是否通过校验,True为可以进行断点续传,False为不能进行断点续传 + """ + parts_info = [] + part_number_marker = 0 + list_over_status = False + while list_over_status is False: + response = self.list_parts( + Bucket=bucket, + Key=key, + UploadId=uploadid, + PartNumberMarker=part_number_marker + ) + parts_info.extend(response['Part']) + if response['IsTruncated'] == 'false': + list_over_status = True + else: + part_number_marker = int(response['NextMarker']) + for part in parts_info: + part_num = int(part['PartNumber']) + # 如果分块数量大于本地计算出的最大数量,校验失败 + if part_num > parts_num: + return False + offset = (part_num - 1) * part_size + local_part_size = part_size + if part_num == parts_num: + local_part_size = last_size + # 有任何一块没有通过校验,则校验失败 + if not self._check_single_upload_part(local_path, offset, local_part_size, int(part['Size']), part['ETag']): + return False + already_exist_parts[part_num] = part['ETag'] + return True + + def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, **kwargs): + """小于等于20MB的文件简单上传,大于20MB的文件使用分块上传 :param Bucket(string): 存储桶名称. :param key(string): 分块上传路径名. :param LocalFilePath(string): 本地文件路径名. - :param PartSize(int): 分块的大小设置. + :param PartSize(int): 分块的大小设置,单位为MB. :param MAXThread(int): 并发上传的最大线程数. :param kwargs(dict): 设置请求headers. :return(dict): 成功上传文件的元信息. @@ -1768,18 +1855,19 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kw Bucket='bucket', Key=file_name, LocalFilePath=file_name, + PartSize=10, MAXThread=10, CacheControl='no-cache', ContentDisposition='download.txt' ) """ file_size = os.path.getsize(LocalFilePath) - if file_size <= 1024*1024*100: + if file_size <= 1024*1024*20: with open(LocalFilePath, 'rb') as fp: rt = self.put_object(Bucket=Bucket, Key=Key, Body=fp, **kwargs) return rt else: - part_size = 1024*1024*PartSize # 默认按照10MB分块,最大支持100G的文件,超过100G的分块数固定为10000 + part_size = 1024*1024*PartSize # 默认按照1MB分块,最大支持10G的文件,超过10G的分块数固定为10000 last_size = 0 # 最后一块可以小于1MB parts_num = file_size / part_size last_size = file_size % part_size @@ -1793,8 +1881,17 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kw last_size += part_size # 创建分块上传 - rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs) - uploadid = rt['UploadId'] + # 判断是否可以断点续传 + resumable_flag = False + already_exist_parts = {} + uploadid = self._get_resumable_uploadid(Bucket, Key) + if uploadid is not None: + # 校验服务端返回的每个块的信息是否和本地的每个块的信息相同,只有校验通过的情况下才可以进行断点续传 + resumable_flag = self._check_all_upload_parts(Bucket, Key, uploadid, LocalFilePath, parts_num, part_size, last_size, already_exist_parts) + # 如果不能断点续传,则创建一个新的分块上传 + if not resumable_flag: + rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs) + uploadid = rt['UploadId'] # 上传分块 offset = 0 # 记录文件偏移量 @@ -1803,23 +1900,19 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kw for i in range(1, parts_num+1): if i == parts_num: # 最后一块 - pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst) + pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst, resumable_flag, already_exist_parts) else: - pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst) + pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst, resumable_flag, already_exist_parts) offset += part_size pool.wait_completion() result = pool.get_result() - if not result['success_all']: + if not result['success_all'] or len(lst) != parts_num: raise CosClientError('some upload_part fail after max_retry') lst = sorted(lst, key=lambda x: x['PartNumber']) # 按PartNumber升序排列 - # 完成分片上传 - try: - rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst}) - except Exception as e: - abort_response = self.abort_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid) - raise e + # 完成分块上传 + rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst}) return rt def _inner_head_object(self, CopySource): @@ -2043,6 +2136,150 @@ def append_object(self, Bucket, Key, Position, Data, **kwargs): response = rt.headers return response + def put_object_from_local_file(self, Bucket, LocalFilePath, Key, EnableMD5=False, **kwargs): + """本地文件上传接口,适用于小文件,最大不得超过5GB + + :param Bucket(string): 存储桶名称. + :param LocalFilePath(string): 上传文件的本地路径. + :param Key(string): COS路径. + :param EnableMD5(bool): 是否需要SDK计算Content-MD5,打开此开关会增加上传耗时. + :kwargs(dict): 设置上传的headers. + :return(dict): 上传成功返回的结果,包含ETag等信息. + + .. code-block:: python + + config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 上传本地文件到cos + response = client.put_object_from_local_file( + Bucket='bucket', + LocalFilePath='local.txt', + Key='test.txt' + ) + print response['ETag'] + """ + with open(LocalFilePath, 'rb') as fp: + return self.put_object(Bucket, fp, Key, EnableMD5, **kwargs) + + def object_exists(self, Bucket, Key): + """判断一个文件是否存在 + + :param Bucket(string): 存储桶名称. + :param Key(string): COS路径. + :return(bool): 文件是否存在,返回True为存在,返回False为不存在 + + .. code-block:: python + + config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 上传本地文件到cos + status = client.object_exists( + Bucket='bucket', + Key='test.txt' + ) + """ + try: + self.head_object(Bucket, Key) + return True + except CosServiceError as e: + if e.get_status_code() == 404: + return False + else: + raise e + + def bucket_exists(self, Bucket): + """判断一个存储桶是否存在 + + :param Bucket(string): 存储桶名称. + :return(bool): 存储桶是否存在,返回True为存在,返回False为不存在. + + .. code-block:: python + + config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 上传本地文件到cos + status = client.bucket_exists( + Bucket='bucket' + ) + """ + try: + self.head_bucket(Bucket) + return True + except CosServiceError as e: + if e.get_status_code() == 404: + return False + else: + raise e + + def change_object_storage_class(self, Bucket, Key, StorageClass): + """改变文件的存储类型 + + :param Bucket(string): 存储桶名称. + :param Key(string): COS路径. + :param StorageClass(bool): 是否需要SDK计算Content-MD5,打开此开关会增加上传耗时. + :kwargs(dict): 设置上传的headers. + :return(dict): 上传成功返回的结果,包含ETag等信息. + + .. code-block:: python + + config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 上传本地文件到cos + response = client.change_object_storage_class( + Bucket='bucket', + Key='test.txt', + StorageClass='STANDARD' + ) + """ + copy_source = { + 'Bucket': Bucket, + 'Key': Key, + 'Region': self._conf._region, + 'Appid': self._conf._appid + } + response = self.copy_object( + Bucket=Bucket, + Key=Key, + CopySource=copy_source, + CopyStatus='Replaced', + StorageClass=StorageClass + ) + return response + + def update_object_meta(self, Bucket, Key, **kwargs): + """改变文件的存储类型 + + :param Bucket(string): 存储桶名称. + :param Key(string): COS路径. + :kwargs(dict): 设置文件的元属性. + :return: None. + + .. code-block:: python + + config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 上传本地文件到cos + response = client.update_object_meta( + Bucket='bucket', + Key='test.txt', + ContentType='text/html' + ) + """ + copy_source = { + 'Bucket': Bucket, + 'Key': Key, + 'Region': self._conf._region, + 'Appid': self._conf._appid + } + response = self.copy_object( + Bucket=Bucket, + Key=Key, + CopySource=copy_source, + CopyStatus='Replaced', + **kwargs + ) + return response + if __name__ == "__main__": pass diff --git a/qcloud_cos/cos_comm.py b/qcloud_cos/cos_comm.py index edd888e1..ae7c6f72 100644 --- a/qcloud_cos/cos_comm.py +++ b/qcloud_cos/cos_comm.py @@ -7,6 +7,7 @@ import sys import xml.dom.minidom import xml.etree.ElementTree +from datetime import datetime from urllib import quote from urllib import unquote from xml2dict import Xml2Dict @@ -63,6 +64,12 @@ def to_unicode(s): return s.decode('utf-8') +def get_raw_md5(data): + m2 = hashlib.md5(data) + etag = '"' + str(m2.hexdigest()) + '"' + return etag + + def get_md5(data): m2 = hashlib.md5(data) MD5 = base64.standard_b64encode(m2.digest()) @@ -262,6 +269,21 @@ def gen_copy_source_range(begin_range, end_range): return range +def check_object_content_length(data): + """put_object接口和upload_part接口的文件大小不允许超过5G""" + content_len = 0 + if type(data) is str: + content_len = len(data) + elif type(data) is file and hasattr(data, 'fileno') and hasattr(data, 'tell'): + fileno = data.fileno() + total_length = os.fstat(fileno).st_size + current_position = data.tell() + content_len = total_length - current_position + if content_len > SINGLE_UPLOAD_LENGTH: + raise CosClientError('The object size you upload can not be larger than 5GB in put_object or upload_part') + return None + + def deal_with_empty_file_stream(data): """对于文件流的剩余长度为0的情况下,返回空字节流""" if hasattr(data, 'fileno') and hasattr(data, 'tell'): @@ -298,3 +320,10 @@ def decode_result(data, key_lst, multi_key_list): if multi_key[1] in item.keys() and item[multi_key[1]]: item[multi_key[1]] = unquote(item[multi_key[1]]) return data + + +def get_date(yy, mm, dd): + """获取lifecycle中Date字段""" + date_str = datetime(yy, mm, dd).isoformat() + final_date_str = date_str+'+08:00' + return final_date_str diff --git a/qcloud_cos/streambody.py b/qcloud_cos/streambody.py index d28fbacc..1cf282e1 100644 --- a/qcloud_cos/streambody.py +++ b/qcloud_cos/streambody.py @@ -1,5 +1,4 @@ # -*- coding=utf-8 -import requests class StreamBody(): diff --git a/ut/test.py b/ut/test.py index ca1e557d..a4315913 100644 --- a/ut/test.py +++ b/ut/test.py @@ -8,6 +8,7 @@ from qcloud_cos import CosS3Client from qcloud_cos import CosConfig from qcloud_cos import CosServiceError +from qcloud_cos import get_date SECRET_ID = os.environ["SECRET_ID"] SECRET_KEY = os.environ["SECRET_KEY"] @@ -30,7 +31,7 @@ def get_raw_md5(data): def gen_file(path, size): _file = open(path, 'w') - _file.seek(1024*1024*size) + _file.seek(1024*1024*size-3) _file.write('cos') _file.close() @@ -429,7 +430,7 @@ def test_put_get_delete_lifecycle(): lifecycle_config = { 'Rule': [ { - 'Expiration': {'Days': 100}, + 'Expiration': {'Date': get_date(2030, 5, 1)}, 'ID': '123', 'Filter': {'Prefix': ''}, 'Status': 'Enabled', @@ -671,6 +672,69 @@ def test_put_object_enable_md5(): os.remove(file_name) +def test_put_object_from_local_file(): + """通过本地文件路径来上传文件""" + file_size = 1 + file_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000)) + file_name = "tmp" + file_id + "_" + str(file_size) + "MB" + gen_file(file_name, 10) + with open(file_name, 'rb') as f: + etag = get_raw_md5(f.read()) + put_response = client.put_object_from_local_file( + Bucket=test_bucket, + LocalFilePath=file_name, + Key=file_name + ) + assert put_response['ETag'] == etag + if os.path.exists(file_name): + os.remove(file_name) + + +def test_object_exists(): + """测试一个文件是否存在""" + status = client.object_exists( + Bucket=test_bucket, + Key=test_object + ) + assert status is True + + +def test_bucket_exists(): + """测试一个bucket是否存在""" + status = client.bucket_exists( + Bucket=test_bucket + ) + assert status is True + + +def test_change_object_storage_class(): + """改变文件的存储类型""" + response = client.change_object_storage_class( + Bucket=test_bucket, + Key=test_object, + StorageClass='NEARLINE' + ) + response = client.head_object( + Bucket=test_bucket, + Key=test_object + ) + assert response['x-cos-storage-class'] == 'NEARLINE' + + +def test_update_object_meta(): + """更新文件的属性""" + response = client.update_object_meta( + Bucket=test_bucket, + Key=test_object, + ContentType='text/html' + ) + response = client.head_object( + Bucket=test_bucket, + Key=test_object + ) + assert response['Content-Type'] == 'text/html; charset=utf-8' + + if __name__ == "__main__": setUp() test_put_object_enable_md5()