Skip to content

Commit

Permalink
Support upload_file_from_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
dt3310321 committed Jan 9, 2018
1 parent e4d6c35 commit f438fe4
Showing 1 changed file with 66 additions and 0 deletions.
66 changes: 66 additions & 0 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1312,5 +1312,71 @@ def copy(self, Bucket, Key, CopySource, CopyStatus='Copy', PartSize=10, MAXThrea
raise e
return rt

def _upload_part_from_buffer(self, bucket, key, data, part_num, uploadid, md5_lst):
"""从内存中读取分块, 上传单个分块,将结果记录在md5——list中
:param bucket(string): 存储桶名称.
:param key(string): 分块上传路径名.
:param data(string): 数据块.
:param part_num(int): 上传分块的序号.
:param uploadid(string): 分块上传的uploadid.
:param md5_lst(list): 保存上传成功分块的MD5和序号.
:return: None.
"""

rt = self.upload_part(bucket, key, data, part_num, uploadid)
md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']})
return None

def upload_file_from_buffer(self, Bucket, Key, Body, MaxBufferSize=100, PartSize=10, MAXThread=5, **kwargs):
"""小于分块大小的的文件简单上传,大于等于分块大小的文件使用分块上传
:param Bucket(string): 存储桶名称.
:param key(string): 分块上传路径名.
:param Body(fp): 文件流,必须实现了read方法.
:param PartSize(int): 分块的大小设置.
:param MAXThread(int): 并发上传的最大线程数.
:param kwargs(dict): 设置请求headers.
:return: None.
"""
if not hasattr(Body, 'read'):
raise CosClientError("Body must has attr read")

part_size = 1024*1024*PartSize

# 先读一个块,如果直接EOF了就调用简单文件上传
part_num = 1
data = Body.read(part_size)

if len(data) < part_size:
rt = self.put_object(Bucket=Bucket, Key=Key, Body=data, **kwargs)
return rt

# 创建分块上传
rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs)
uploadid = rt['UploadId']

lst = list() # 记录分块信息
MAXQueue = MaxBufferSize/PartSize
pool = SimpleThreadPool(MAXThread, MAXQueue)
while True:
if data == "":
break
pool.add_task(self._upload_part_from_buffer, Bucket, Key, data, part_num, uploadid, lst)
part_num += 1
data = Body.read(part_size)

pool.wait_completion()
lst = sorted(lst, key=lambda x: x['PartNumber']) # 按PartNumber升序排列

# 完成分片上传
try:
rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst})
print rt
except Exception as e:
abort_response = self.abort_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid)
raise e
return rt

if __name__ == "__main__":
pass

0 comments on commit f438fe4

Please sign in to comment.