diff --git a/LICENSE b/LICENSE
new file mode 100644
index 00000000..275d2e42
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 腾讯云
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/README.rst b/README.rst
index b9304788..520b0f70 100644
--- a/README.rst
+++ b/README.rst
@@ -27,11 +27,11 @@ cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224
.. code:: python
# 设置用户属性, 包括appid, secret_id, secret_key, region
- appid = 100000 # 替换为用户的appid
- secret_id = u'xxxxxxxx' # 替换为用户的secret_id
- secret_key = u'xxxxxxx' # 替换为用户的secret_key
- region = "ap-beiging-1" # 替换为用户的region
- token = '' # 使用临时秘钥需要传入Token,默认为空,可不填
+ appid = '100000' # 替换为用户的appid
+ secret_id = 'xxxxxxxx' # 替换为用户的secret_id
+ secret_key = 'xxxxxxx' # 替换为用户的secret_key
+ region = 'ap-beiging-1' # 替换为用户的region
+ token = '' # 使用临时秘钥需要传入Token,默认为空,可不填
config = CosConfig(Appid=appid, Region=region, Access_id=secret_id, Access_key=secret_key, Token=token) #获取配置对象
client = CosS3Client(config) #获取客户端对象
diff --git a/qcloud_cos/cos_auth.py b/qcloud_cos/cos_auth.py
index 6cadc4e2..4a010443 100644
--- a/qcloud_cos/cos_auth.py
+++ b/qcloud_cos/cos_auth.py
@@ -11,38 +11,61 @@
logger = logging.getLogger(__name__)
+def filter_headers(data):
+ """只设置host content-type 还有x开头的头部.
+
+ :param data(dict): 所有的头部信息.
+ :return(dict): 计算进签名的头部.
+ """
+ headers = {}
+ for i in data.keys():
+ if i == 'Content-Type' or i == 'Host' or i[0] == 'x' or i[0] == 'X':
+ headers[i] = data[i]
+ return headers
+
+
+def to_string(data):
+ """转换unicode为string.
+
+ :param data(unicode|string): 待转换的unicode|string.
+ :return(string): 转换后的string.
+ """
+ if isinstance(data, unicode):
+ return data.encode('utf8')
+ return data
+
+
class CosS3Auth(AuthBase):
- def __init__(self, access_id, secret_key, expire=10000):
- self._access_id = access_id
- self._secret_key = secret_key
+ def __init__(self, secret_id, secret_key, key='', params={}, expire=10000):
+ self._secret_id = to_string(secret_id)
+ self._secret_key = to_string(secret_key)
self._expire = expire
+ self._params = params
+ if key:
+ if key[0] == '/':
+ self._path = key
+ else:
+ self._path = '/' + key
+ else:
+ self._path = '/'
def __call__(self, r):
- method = r.method.lower()
- uri = urllib.unquote(r.url)
- uri = uri.split('?')[0]
- http_header = r.headers
- r.headers = {}
- rt = urlparse(uri)
- logger.debug("url parse: " + str(rt))
- if rt.query != "" and ("&" in rt.query or '=' in rt.query):
- uri_params = dict(map(lambda s: s.lower().split('='), rt.query.split('&')))
- elif rt.query != "":
- uri_params = {rt.query: ""}
- else:
- uri_params = {}
- headers = dict([(k.lower(), quote(v).lower()) for k, v in r.headers.items()])
+ path = self._path
+ uri_params = self._params
+ headers = filter_headers(r.headers)
+ # reserved keywords in headers urlencode are -_.~, notice that / should be encoded and space should not be encoded to plus sign(+)
+ headers = dict([(k.lower(), quote(v, '-_.~')) for k, v in headers.items()]) # headers中的key转换为小写,value进行encode
format_str = "{method}\n{host}\n{params}\n{headers}\n".format(
- method=method.lower(),
- host=rt.path,
- params=urllib.urlencode(uri_params),
+ method=r.method.lower(),
+ host=path,
+ params=urllib.urlencode(sorted(uri_params.items())),
headers='&'.join(map(lambda (x, y): "%s=%s" % (x, y), sorted(headers.items())))
)
logger.debug("format str: " + format_str)
start_sign_time = int(time.time())
- sign_time = "{bg_time};{ed_time}".format(bg_time=start_sign_time-60, ed_time=start_sign_time + self._expire)
+ sign_time = "{bg_time};{ed_time}".format(bg_time=start_sign_time-60, ed_time=start_sign_time+self._expire)
sha1 = hashlib.sha1()
sha1.update(format_str)
@@ -54,18 +77,16 @@ def __call__(self, r):
logger.debug('sign: ' + str(sign))
sign_tpl = "q-sign-algorithm=sha1&q-ak={ak}&q-sign-time={sign_time}&q-key-time={key_time}&q-header-list={headers}&q-url-param-list={params}&q-signature={sign}"
- http_header['Authorization'] = sign_tpl.format(
- ak=self._access_id,
+ r.headers['Authorization'] = sign_tpl.format(
+ ak=self._secret_id,
sign_time=sign_time,
key_time=sign_time,
params=';'.join(sorted(map(lambda k: k.lower(), uri_params.keys()))),
headers=';'.join(sorted(headers.keys())),
sign=sign
)
- r.headers = http_header
logger.debug("sign_key" + str(sign_key))
logger.debug(r.headers['Authorization'])
-
logger.debug("request headers: " + str(r.headers))
return r
diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py
index 7358dd5f..9ea8e3bd 100644
--- a/qcloud_cos/cos_client.py
+++ b/qcloud_cos/cos_client.py
@@ -3,13 +3,18 @@
import requests
import urllib
import logging
+import hashlib
+import base64
+import os
import sys
import copy
import xml.dom.minidom
import xml.etree.ElementTree
from requests import Request, Session
+from urllib import quote
from streambody import StreamBody
from xml2dict import Xml2Dict
+from dicttoxml import dicttoxml
from cos_auth import CosS3Auth
from cos_exception import CosClientError
from cos_exception import CosServiceError
@@ -18,11 +23,13 @@
level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
- filename='cos_s3.log',
+ filename='cos_v5.log',
filemode='w')
logger = logging.getLogger(__name__)
reload(sys)
sys.setdefaultencoding('utf-8')
+
+# kwargs中params到http headers的映射
maplist = {
'ContentLength': 'Content-Length',
'ContentMD5': 'Content-MD5',
@@ -49,6 +56,10 @@
'IfNoneMatch': 'If-None-Match',
'IfModifiedSince': 'If-Modified-Since',
'IfUnmodifiedSince': 'If-Unmodified-Since',
+ 'CopySourceIfMatch': 'x-cos-copy-source-If-Match',
+ 'CopySourceIfNoneMatch': 'x-cos-copy-source-If-None-Match',
+ 'CopySourceIfModifiedSince': 'x-cos-copy-source-If-Modified-Since',
+ 'CopySourceIfUnmodifiedSince': 'x-cos-copy-source-If-Unmodified-Since',
'VersionId': 'x-cos-version-id',
}
@@ -60,6 +71,12 @@ def to_unicode(s):
return s.decode('utf-8')
+def get_md5(data):
+ m2 = hashlib.md5(data)
+ MD5 = base64.standard_b64encode(m2.digest())
+ return MD5
+
+
def dict_to_xml(data):
"""V5使用xml格式,将输入的dict转换为xml"""
doc = xml.dom.minidom.Document()
@@ -90,13 +107,15 @@ def dict_to_xml(data):
return doc.toxml('utf-8')
-def xml_to_dict(data):
+def xml_to_dict(data, origin_str="", replace_str=""):
"""V5使用xml格式,将response中的xml转换为dict"""
root = xml.etree.ElementTree.fromstring(data)
xmldict = Xml2Dict(root)
xmlstr = str(xmldict)
xmlstr = xmlstr.replace("{http://www.qcloud.com/document/product/436/7751}", "")
xmlstr = xmlstr.replace("{http://www.w3.org/2001/XMLSchema-instance}", "")
+ if origin_str:
+ xmlstr = xmlstr.replace(origin_str, replace_str)
xmldict = eval(xmlstr)
return xmldict
@@ -121,7 +140,16 @@ def mapped(headers):
return _headers
+def format_xml(data, root, lst=list()):
+ """将dict转换为xml"""
+ xml_config = dicttoxml(data, item_func=lambda x: x, custom_root=root, attr_type=False)
+ for i in lst:
+ xml_config = xml_config.replace(i+i, i)
+ return xml_config
+
+
def format_region(region):
+ """格式化地域"""
if region.find('cos.') != -1:
return region # 传入cos.ap-beijing-1这样显示加上cos.的region
if region == 'cn-north' or region == 'cn-south' or region == 'cn-east' or region == 'cn-south-2' or region == 'cn-southwest' or region == 'sg':
@@ -152,6 +180,14 @@ def format_region(region):
class CosConfig(object):
"""config类,保存用户相关信息"""
def __init__(self, Appid, Region, Access_id, Access_key, Token=None):
+ """初始化,保存用户的信息
+
+ :param Appid(string): 用户APPID.
+ :param Region(string): 地域信息.
+ :param Access_id(string): 秘钥SecretId.
+ :param Access_key(string): 秘钥SecretKey.
+ :param Token(string): 临时秘钥使用的token.
+ """
self._appid = Appid
self._region = format_region(Region)
self._access_id = Access_id
@@ -162,7 +198,12 @@ def __init__(self, Appid, Region, Access_id, Access_key, Token=None):
region=Region))
def uri(self, bucket, path=None):
- """拼接url"""
+ """拼接url
+
+ :param bucket(string): 存储桶名称.
+ :param path(string): 请求COS的路径.
+ :return(string): 请求COS的URL地址.
+ """
if path:
if path[0] == '/':
path = path[1:]
@@ -173,7 +214,7 @@ def uri(self, bucket, path=None):
path=to_unicode(path)
)
else:
- url = u"http://{bucket}-{uid}.{region}.myqcloud.com".format(
+ url = u"http://{bucket}-{uid}.{region}.myqcloud.com/".format(
bucket=to_unicode(bucket),
uid=self._appid,
region=self._region
@@ -184,6 +225,12 @@ def uri(self, bucket, path=None):
class CosS3Client(object):
"""cos客户端类,封装相应请求"""
def __init__(self, conf, retry=1, session=None):
+ """初始化client对象
+
+ :param conf(CosConfig): 用户的配置.
+ :param retry(int): 失败重试的次数.
+ :param session(object): http session.
+ """
self._conf = conf
self._retry = retry # 重试的次数,分片上传时可适当增大
if session is None:
@@ -191,14 +238,24 @@ def __init__(self, conf, retry=1, session=None):
else:
self._session = session
- def get_auth(self, Method, Bucket, Key=None, Expired=300, headers={}, params={}):
- """获取签名"""
- url = self._conf.uri(bucket=Bucket, path=Key)
+ def get_auth(self, Method, Bucket, Key='', Expired=300, headers={}, params={}):
+ """获取签名
+
+ :param Method(string): http method,如'PUT','GET'.
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): 请求COS的路径.
+ :param Expired(int): 签名有效时间,单位为s.
+ :param headers(dict): 签名中的http headers.
+ :param params(dict): 签名中的http params.
+ :return (string): 计算出的V5签名.
+ """
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
r = Request(Method, url, headers=headers, params=params)
- auth = CosS3Auth(self._conf._access_id, self._conf._access_key, Expired)
+ auth = CosS3Auth(self._conf._access_id, self._conf._access_key, Key, params, Expired)
return auth(r).headers['Authorization']
def send_request(self, method, url, timeout=30, **kwargs):
+ """封装request库发起http请求"""
if self._conf._token is not None:
kwargs['headers']['x-cos-security-token'] = self._conf._token
kwargs['headers']['User-Agent'] = 'cos-python-sdk-v5'
@@ -239,32 +296,44 @@ def send_request(self, method, url, timeout=30, **kwargs):
# s3 object interface begin
def put_object(self, Bucket, Body, Key, **kwargs):
- """单文件上传接口,适用于小文件,最大不得超过5GB"""
+ """单文件上传接口,适用于小文件,最大不得超过5GB
+
+ :param Bucket(string): 存储桶名称.
+ :param Body(file|string): 上传的文件内容,类型为文件流或字节流.
+ :param Key(string): COS路径.
+ :kwargs(dict): 设置上传的headers.
+ :return(dict): 上传成功返回的结果,包含ETag等信息.
+ """
headers = mapped(kwargs)
if 'Metadata' in headers.keys():
for i in headers['Metadata'].keys():
headers[i] = headers['Metadata'][i]
headers.pop('Metadata')
- url = self._conf.uri(bucket=Bucket, path=Key)
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')) # 提前对key做encode
logger.info("put object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='PUT',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
data=Body,
headers=headers)
- response = dict()
- response['ETag'] = rt.headers['ETag']
+ response = rt.headers
return response
def get_object(self, Bucket, Key, **kwargs):
- """单文件下载接口"""
+ """单文件下载接口
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param kwargs(dict): 设置下载的headers.
+ :return(dict): 下载成功返回的结果,包含Body对应的StreamBody,可以获取文件流或下载文件到本地.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key)
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
logger.info("get object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
@@ -272,107 +341,167 @@ def get_object(self, Bucket, Key, **kwargs):
method='GET',
url=url,
stream=True,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
- response = dict()
+ response = rt.headers
response['Body'] = StreamBody(rt)
- for k in rt.headers.keys():
- response[k] = rt.headers[k]
return response
def get_presigned_download_url(self, Bucket, Key, Expired=300):
- """生成预签名的下载url"""
- url = self._conf.uri(bucket=Bucket, path=Key)
+ """生成预签名的下载url
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param Expired(int): 签名过期时间.
+ :return(string): 预先签名的下载URL.
+ """
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
sign = self.get_auth(Method='GET', Bucket=Bucket, Key=Key, Expired=300)
- url = urllib.quote(url.encode('utf8'), ':/') + '?sign=' + urllib.quote(sign)
+ url = url + '?sign=' + urllib.quote(sign)
return url
def delete_object(self, Bucket, Key, **kwargs):
- """单文件删除接口"""
+ """单文件删除接口
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key)
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
logger.info("delete object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='DELETE',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
return None
def head_object(self, Bucket, Key, **kwargs):
- """获取文件信息"""
+ """获取文件信息
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 文件的metadata信息.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key)
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
logger.info("head object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='HEAD',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
return rt.headers
def gen_copy_source_url(self, CopySource):
"""拼接拷贝源url"""
+ if 'Appid' in CopySource.keys():
+ appid = CopySource['Appid']
+ else:
+ raise CosClientError('CopySource Need Parameter Appid')
if 'Bucket' in CopySource.keys():
bucket = CopySource['Bucket']
else:
raise CosClientError('CopySource Need Parameter Bucket')
+ if 'Region' in CopySource.keys():
+ region = CopySource['Region']
+ region = format_region(region)
+ else:
+ raise CosClientError('CopySource Need Parameter Region')
if 'Key' in CopySource.keys():
- key = CopySource['Key']
+ path = CopySource['Key']
+ if path and path[0] == '/':
+ path = path[1:]
else:
raise CosClientError('CopySource Need Parameter Key')
- url = self._conf.uri(bucket=bucket, path=key).encode('utf8')
- url = url[7:] # copysource不支持http://开头,去除
+ url = "{bucket}-{uid}.{region}.myqcloud.com/{path}".format(
+ bucket=bucket,
+ uid=appid,
+ region=region,
+ path=path
+ )
return url
def copy_object(self, Bucket, Key, CopySource, CopyStatus='Copy', **kwargs):
- """文件拷贝,文件信息修改"""
+ """文件拷贝,文件信息修改
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): 上传COS路径.
+ :param CopySource(dict): 拷贝源,包含Appid,Bucket,Region,Key.
+ :param CopyStatus(string): 拷贝状态,可选值'Copy'|'Replaced'.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 拷贝成功的结果.
+ """
headers = mapped(kwargs)
+ if 'Metadata' in headers.keys():
+ for i in headers['Metadata'].keys():
+ headers[i] = headers['Metadata'][i]
+ headers.pop('Metadata')
headers['x-cos-copy-source'] = self.gen_copy_source_url(CopySource)
+ if CopyStatus != 'Copy' and CopyStatus != 'Replaced':
+ raise CosClientError('CopyStatus must be Copy or Replaced')
headers['x-cos-metadata-directive'] = CopyStatus
- url = self._conf.uri(bucket=Bucket, path=Key)
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
logger.info("copy object, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='PUT',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
data = xml_to_dict(rt.text)
return data
def create_multipart_upload(self, Bucket, Key, **kwargs):
- """创建分片上传,适用于大文件上传"""
+ """创建分片上传,适用于大文件上传
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 初始化分块上传返回的结果,包含UploadId等信息.
+ """
headers = mapped(kwargs)
if 'Metadata' in headers.keys():
for i in headers['Metadata'].keys():
headers[i] = headers['Metadata'][i]
headers.pop('Metadata')
- url = self._conf.uri(bucket=Bucket, path=Key+"?uploads")
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?uploads")
logger.info("create multipart upload, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='POST',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
data = xml_to_dict(rt.text)
return data
def upload_part(self, Bucket, Key, Body, PartNumber, UploadId, **kwargs):
- """上传分片,单个大小不得超过5GB"""
+ """上传分片,单个大小不得超过5GB
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param Body(file|string): 上传分块的内容,可以为文件流或者字节流.
+ :param PartNumber(int): 上传分块的编号.
+ :param UploadId(string): 分块上传创建的UploadId.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 上传成功返回的结果,包含单个分块ETag等信息.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key+"?partNumber={PartNumber}&uploadId={UploadId}".format(
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?partNumber={PartNumber}&uploadId={UploadId}".format(
PartNumber=PartNumber,
UploadId=UploadId))
logger.info("put object, url=:{url} ,headers=:{headers}".format(
@@ -382,23 +511,31 @@ def upload_part(self, Bucket, Key, Body, PartNumber, UploadId, **kwargs):
method='PUT',
url=url,
headers=headers,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
data=Body)
response = dict()
response['ETag'] = rt.headers['ETag']
return response
def complete_multipart_upload(self, Bucket, Key, UploadId, MultipartUpload={}, **kwargs):
- """完成分片上传,组装后的文件不得小于1MB,否则会返回错误"""
+ """完成分片上传,除最后一块分块块大小必须大于等于1MB,否则会返回错误.
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param UploadId(string): 分块上传创建的UploadId.
+ :param MultipartUpload(dict): 所有分块的信息,包含Etag和PartNumber.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 上传成功返回的结果,包含整个文件的ETag等信息.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key+"?uploadId={UploadId}".format(UploadId=UploadId))
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?uploadId={UploadId}".format(UploadId=UploadId))
logger.info("complete multipart upload, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='POST',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
data=dict_to_xml(MultipartUpload),
timeout=1200, # 分片上传大文件的时间比较长,设置为20min
headers=headers)
@@ -406,36 +543,56 @@ def complete_multipart_upload(self, Bucket, Key, UploadId, MultipartUpload={}, *
return data
def abort_multipart_upload(self, Bucket, Key, UploadId, **kwargs):
- """放弃一个已经存在的分片上传任务,删除所有已经存在的分片"""
+ """放弃一个已经存在的分片上传任务,删除所有已经存在的分片.
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param UploadId(string): 分块上传创建的UploadId.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key+"?uploadId={UploadId}".format(UploadId=UploadId))
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?uploadId={UploadId}".format(UploadId=UploadId))
logger.info("abort multipart upload, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='DELETE',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
return None
- def list_parts(self, Bucket, Key, UploadId, EncodingType='url', MaxParts=1000, PartNumberMarker=0, **kwargs):
- """列出已上传的分片"""
+ def list_parts(self, Bucket, Key, UploadId, EncodingType='', MaxParts=1000, PartNumberMarker=0, **kwargs):
+ """列出已上传的分片.
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param UploadId(string): 分块上传创建的UploadId.
+ :param EncodingType(string): 设置返回结果编码方式,只能设置为url.
+ :param MaxParts(int): 设置单次返回最大的分块数量,最大为1000.
+ :param PartNumberMarker(int): 设置返回的开始处,从PartNumberMarker下一个分块开始列出.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 分块的相关信息,包括Etag和PartNumber等信息.
+ """
headers = mapped(kwargs)
params = {
'uploadId': UploadId,
'part-number-marker': PartNumberMarker,
- 'max-parts': MaxParts,
- 'encoding-type': EncodingType}
+ 'max-parts': MaxParts}
+ if EncodingType:
+ if EncodingType != 'url':
+ raise CosClientError('EncodingType must be url')
+ params['encoding-type'] = EncodingType
- url = self._conf.uri(bucket=Bucket, path=Key)
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
logger.info("list multipart upload, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='GET',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers,
params=params)
data = xml_to_dict(rt.text)
@@ -445,33 +602,53 @@ def list_parts(self, Bucket, Key, UploadId, EncodingType='url', MaxParts=1000, P
data['Part'] = lst
return data
- def put_object_acl(self, Bucket, Key, **kwargs):
- """设置object ACL"""
+ def put_object_acl(self, Bucket, Key, AccessControlPolicy={}, **kwargs):
+ """设置object ACL
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param AccessControlPolicy(dict): 设置object ACL规则.
+ :param kwargs(dict): 通过headers来设置ACL.
+ :return: None.
+ """
+ lst = [ # 类型为list的标签
+ '',
+ '']
+ xml_config = ""
+ if AccessControlPolicy:
+ xml_config = format_xml(data=AccessControlPolicy, root='AccessControlPolicy', lst=lst)
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key+"?acl")
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?acl")
logger.info("put object acl, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='PUT',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ data=xml_config,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
return None
def get_object_acl(self, Bucket, Key, **kwargs):
- """获取object ACL"""
+ """获取object ACL
+
+ :param Bucket(string): 存储桶名称.
+ :param Key(string): COS路径.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): Object对应的ACL信息.
+ """
headers = mapped(kwargs)
- url = self._conf.uri(bucket=Bucket, path=Key+"?acl")
+ url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~')+"?acl")
logger.info("get object acl, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
rt = self.send_request(
method='GET',
url=url,
- auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key, Key),
headers=headers)
- data = xml_to_dict(rt.text)
+ data = xml_to_dict(rt.text, "type", "Type")
if data['AccessControlList'] is not None and isinstance(data['AccessControlList']['Grant'], dict):
lst = []
lst.append(data['AccessControlList']['Grant'])
@@ -480,7 +657,12 @@ def get_object_acl(self, Bucket, Key, **kwargs):
# s3 bucket interface begin
def create_bucket(self, Bucket, **kwargs):
- """创建一个bucket"""
+ """创建一个bucket
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket)
logger.info("create bucket, url=:{url} ,headers=:{headers}".format(
@@ -494,7 +676,12 @@ def create_bucket(self, Bucket, **kwargs):
return None
def delete_bucket(self, Bucket, **kwargs):
- """删除一个bucket,bucket必须为空"""
+ """删除一个bucket,bucket必须为空
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket)
logger.info("delete bucket, url=:{url} ,headers=:{headers}".format(
@@ -507,8 +694,18 @@ def delete_bucket(self, Bucket, **kwargs):
headers=headers)
return None
- def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="", EncodingType="url", **kwargs):
- """获取文件列表"""
+ def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="", EncodingType="", **kwargs):
+ """获取文件列表
+
+ :param Bucket(string): 存储桶名称.
+ :param Delimiter(string): 分隔符.
+ :param Marker(string): 从marker开始列出条目.
+ :param MaxKeys(int): 设置单次返回最大的数量,最大为1000.
+ :param Prefix(string): 设置匹配文件的前缀.
+ :param EncodingType(string): 设置返回结果编码方式,只能设置为url.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 文件的相关信息,包括Etag等信息.
+ """
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket)
logger.info("list objects, url=:{url} ,headers=:{headers}".format(
@@ -518,8 +715,12 @@ def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="",
'delimiter': Delimiter,
'marker': Marker,
'max-keys': MaxKeys,
- 'prefix': Prefix,
- 'encoding-type': EncodingType}
+ 'prefix': Prefix
+ }
+ if EncodingType:
+ if EncodingType != 'url':
+ raise CosClientError('EncodingType must be url')
+ params['encoding-type'] = EncodingType
rt = self.send_request(
method='GET',
url=url,
@@ -535,7 +736,12 @@ def list_objects(self, Bucket, Delimiter="", Marker="", MaxKeys=1000, Prefix="",
return data
def head_bucket(self, Bucket, **kwargs):
- """获取bucket信息"""
+ """确认bucket是否存在
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket)
logger.info("head bucket, url=:{url} ,headers=:{headers}".format(
@@ -548,8 +754,20 @@ def head_bucket(self, Bucket, **kwargs):
headers=headers)
return None
- def put_bucket_acl(self, Bucket, **kwargs):
- """设置bucket ACL"""
+ def put_bucket_acl(self, Bucket, AccessControlPolicy={}, **kwargs):
+ """设置bucket ACL
+
+ :param Bucket(string): 存储桶名称.
+ :param AccessControlPolicy(dict): 设置bucket ACL规则.
+ :param kwargs(dict): 通过headers来设置ACL.
+ :return: None.
+ """
+ lst = [ # 类型为list的标签
+ '',
+ '']
+ xml_config = ""
+ if AccessControlPolicy:
+ xml_config = format_xml(data=AccessControlPolicy, root='AccessControlPolicy', lst=lst)
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path="?acl")
logger.info("put bucket acl, url=:{url} ,headers=:{headers}".format(
@@ -558,12 +776,18 @@ def put_bucket_acl(self, Bucket, **kwargs):
rt = self.send_request(
method='PUT',
url=url,
+ data=xml_config,
auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
headers=headers)
return None
def get_bucket_acl(self, Bucket, **kwargs):
- """获取bucket ACL"""
+ """获取bucket ACL
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置headers.
+ :return(dict): Bucket对应的ACL信息.
+ """
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path="?acl")
logger.info("get bucket acl, url=:{url} ,headers=:{headers}".format(
@@ -574,16 +798,238 @@ def get_bucket_acl(self, Bucket, **kwargs):
url=url,
auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
headers=headers)
- data = xml_to_dict(rt.text)
- if data['AccessControlList'] is not None and isinstance(data['AccessControlList']['Grant'], dict):
+ data = xml_to_dict(rt.text, "type", "Type")
+ if data['AccessControlList'] is not None and not isinstance(data['AccessControlList']['Grant'], list):
lst = []
lst.append(data['AccessControlList']['Grant'])
data['AccessControlList']['Grant'] = lst
return data
+ def put_bucket_cors(self, Bucket, CORSConfiguration={}, **kwargs):
+ """设置bucket CORS
+
+ :param Bucket(string): 存储桶名称.
+ :param CORSConfiguration(dict): 设置Bucket跨域规则.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
+ lst = [ # 类型为list的标签
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '']
+ xml_config = format_xml(data=CORSConfiguration, root='CORSConfiguration', lst=lst)
+ headers = mapped(kwargs)
+ headers['Content-MD5'] = get_md5(xml_config)
+ headers['Content-Type'] = 'application/xml'
+ url = self._conf.uri(bucket=Bucket, path="?cors")
+ logger.info("put bucket cors, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='PUT',
+ url=url,
+ data=xml_config,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ return None
+
+ def get_bucket_cors(self, Bucket, **kwargs):
+ """获取bucket CORS
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 获取Bucket对应的跨域配置.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?cors")
+ logger.info("get bucket cors, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='GET',
+ url=url,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ data = xml_to_dict(rt.text)
+ if 'CORSRule' in data.keys() and not isinstance(data['CORSRule'], list):
+ lst = []
+ lst.append(data['CORSRule'])
+ data['CORSRule'] = lst
+ if 'CORSRule' in data.keys():
+ allow_lst = ['AllowedOrigin', 'AllowedMethod', 'AllowedHeader', 'ExposeHeader']
+ for rule in data['CORSRule']:
+ for text in allow_lst:
+ if text in rule.keys() and not isinstance(rule[text], list):
+ lst = []
+ lst.append(rule[text])
+ rule[text] = lst
+ return data
+
+ def delete_bucket_cors(self, Bucket, **kwargs):
+ """删除bucket CORS
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?cors")
+ logger.info("delete bucket cors, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='DELETE',
+ url=url,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ return None
+
+ def put_bucket_lifecycle(self, Bucket, LifecycleConfiguration={}, **kwargs):
+ """设置bucket LifeCycle
+ :param Bucket(string): 存储桶名称.
+ :param LifecycleConfiguration(dict): 设置Bucket的生命周期规则.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
+ lst = ['', '', '', ''] # 类型为list的标签
+ xml_config = format_xml(data=LifecycleConfiguration, root='LifecycleConfiguration', lst=lst)
+ headers = mapped(kwargs)
+ headers['Content-MD5'] = get_md5(xml_config)
+ headers['Content-Type'] = 'application/xml'
+ url = self._conf.uri(bucket=Bucket, path="?lifecycle")
+ logger.info("put bucket lifecycle, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='PUT',
+ url=url,
+ data=xml_config,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ return None
+
+ def get_bucket_lifecycle(self, Bucket, **kwargs):
+ """获取bucket LifeCycle
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): Bucket对应的生命周期配置.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?lifecycle")
+ logger.info("get bucket cors, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='GET',
+ url=url,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ data = xml_to_dict(rt.text)
+ if 'Rule' in data.keys() and not isinstance(data['Rule'], list):
+ lst = []
+ lst.append(data['Rule'])
+ data['Rule'] = lst
+ return data
+
+ def delete_bucket_lifecycle(self, Bucket, **kwargs):
+ """删除bucket LifeCycle
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?lifecycle")
+ logger.info("delete bucket cors, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='DELETE',
+ url=url,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ return None
+
+ def put_bucket_versioning(self, Bucket, Status, **kwargs):
+ """设置bucket版本控制
+ :param Bucket(string): 存储桶名称.
+ :param Status(string): 设置Bucket版本控制的状态,可选值为'Enabled'|'Suspended'.
+ :param kwargs(dict): 设置请求headers.
+ :return: None.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?versioning")
+ logger.info("put bucket versioning, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ if Status != 'Enabled' and Status != 'Suspended':
+ raise CosClientError('versioning status must be set to Enabled or Suspended!')
+ config = dict()
+ config['Status'] = Status
+ xml_config = format_xml(data=config, root='VersioningConfiguration')
+ rt = self.send_request(
+ method='PUT',
+ url=url,
+ data=xml_config,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ return None
+
+ def get_bucket_versioning(self, Bucket, **kwargs):
+ """查询bucket版本控制
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 获取Bucket版本控制的配置.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?versioning")
+ logger.info("get bucket versioning, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='GET',
+ url=url,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ data = xml_to_dict(rt.text)
+ return data
+
+ def get_bucket_location(self, Bucket, **kwargs):
+ """查询bucket所属地域
+
+ :param Bucket(string): 存储桶名称.
+ :param kwargs(dict): 设置请求headers.
+ :return(dict): 存储桶的地域信息.
+ """
+ headers = mapped(kwargs)
+ url = self._conf.uri(bucket=Bucket, path="?location")
+ logger.info("get bucket location, url=:{url} ,headers=:{headers}".format(
+ url=url,
+ headers=headers))
+ rt = self.send_request(
+ method='GET',
+ url=url,
+ auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
+ headers=headers)
+ root = xml.etree.ElementTree.fromstring(rt.text)
+ data = dict()
+ data['LocationConstraint'] = root.text
+ return data
+
# service interface begin
def list_buckets(self, **kwargs):
- """列出所有bucket"""
+ """列出所有bucket
+
+ :return(dict): 账号下bucket相关信息.
+ """
headers = mapped(kwargs)
url = 'http://service.cos.myqcloud.com/'
rt = self.send_request(
@@ -593,7 +1039,7 @@ def list_buckets(self, **kwargs):
auth=CosS3Auth(self._conf._access_id, self._conf._access_key),
)
data = xml_to_dict(rt.text)
- if data['Buckets'] is not None and isinstance(data['Buckets']['Bucket'], dict):
+ if data['Buckets'] is not None and not isinstance(data['Buckets']['Bucket'], list):
lst = []
lst.append(data['Buckets']['Bucket'])
data['Buckets']['Bucket'] = lst
diff --git a/qcloud_cos/demo.py b/qcloud_cos/demo.py
new file mode 100644
index 00000000..7720cf3f
--- /dev/null
+++ b/qcloud_cos/demo.py
@@ -0,0 +1,77 @@
+# -*- coding=utf-8
+from qcloud_cos import CosConfig
+from qcloud_cos import CosS3Client
+from qcloud_cos import CosServiceError
+from qcloud_cos import CosClientError
+
+# 腾讯云COSV5Python SDK, 目前可以支持Python2.6与Python2.7
+
+# pip安装指南:pip install -U cos-python-sdk-v5
+
+# cos最新可用地域,参照https://www.qcloud.com/document/product/436/6224
+
+# 设置用户属性, 包括appid, secret_id, secret_key, region
+appid = '1242338703' # 替换为用户的appid
+secret_id = 'AKID15IsskiBQACGbAo6WhgcQbVls7HmuG00' # 替换为用户的secret_id
+secret_key = 'csivKvxxrMvSvQpMWHuIz12pThQQlWRW' # 替换为用户的secret_key
+region = 'ap-beijing-1' # 替换为用户的region
+token = '' # 使用临时秘钥需要传入Token,默认为空,可不填
+config = CosConfig(Appid=appid, Region=region, Access_id=secret_id, Access_key=secret_key, Token=token) # 获取配置对象
+client = CosS3Client(config)
+
+# 文件流 简单上传
+fp = open('test.txt', 'rb')
+file_name = 'test.txt'
+response = client.put_object(
+ Bucket='test04',
+ Body=fp,
+ Key=file_name,
+ StorageClass='STANDARD',
+ CacheControl='no-cache',
+ ContentDisposition='download.txt'
+)
+fp.close()
+print response['ETag']
+
+# 字节流 简单上传
+response = client.put_object(
+ Bucket='test04',
+ Body='abcdefg',
+ Key=file_name,
+ CacheControl='no-cache',
+ ContentDisposition='download.txt'
+)
+print response['ETag']
+
+# 文件下载 获取文件到本地
+response = client.get_object(
+ Bucket='test04',
+ Key=file_name,
+)
+response['Body'].get_stream_to_file('output.txt')
+
+# 文件下载 获取文件流
+response = client.get_object(
+ Bucket='test04',
+ Key=file_name,
+)
+fp = response['Body'].get_raw_stream()
+print fp.read(2)
+
+# 文件下载 捕获异常
+try:
+ response = client.get_object(
+ Bucket='test04',
+ Key='not_exist.txt',
+ )
+ fp = response['Body'].get_raw_stream()
+ print fp.read(2)
+except CosServiceError as e:
+ print e.get_origin_msg()
+ print e.get_digest_msg()
+ print e.get_status_code()
+ print e.get_error_code()
+ print e.get_error_msg()
+ print e.get_resource_location()
+ print e.get_trace_id()
+ print e.get_request_id()
diff --git a/qcloud_cos/test.py b/qcloud_cos/test.py
index 5382c9d7..ade34355 100644
--- a/qcloud_cos/test.py
+++ b/qcloud_cos/test.py
@@ -50,6 +50,84 @@ def Test():
file_id = str(random.randint(0, 1000)) + str(random.randint(0, 1000))
file_name = "tmp" + file_id + "_" + str(file_size) + "MB"
+ print "test put bucket cors " + test_bucket
+ cors_config = {
+ 'CORSRule': [
+ {
+ 'ID': '1234',
+ 'AllowedOrigin': ['http://www.qq.com'],
+ 'AllowedMethod': ['GET', 'PUT'],
+ 'AllowedHeader': ['x-cos-meta-test'],
+ 'ExposeHeader': ['x-cos-meta-test1'],
+ 'MaxAgeSeconds': 500
+ }]
+ }
+ response = client.put_bucket_cors(
+ Bucket=test_bucket,
+ CORSConfiguration=cors_config
+ )
+
+ print "test get bucket cors " + test_bucket
+ response = client.get_bucket_cors(
+ Bucket=test_bucket
+ )
+ print response
+
+ print "test delete bucket cors " + test_bucket
+ response = client.delete_bucket_cors(
+ Bucket=test_bucket
+ )
+
+ print "test put bucket lifecycle " + test_bucket
+ life_config = {
+ 'Rule': [
+ {
+ 'Expiration': {'Days': 100},
+ 'ID': '123',
+ 'Filter': {'Prefix': '456'},
+ 'Status': 'Enabled',
+ }
+ ]
+ }
+ response = client.put_bucket_lifecycle(
+ Bucket=test_bucket,
+ LifecycleConfiguration=life_config
+ )
+
+ print "test get bucket lifecycle " + test_bucket
+ response = client.get_bucket_lifecycle(
+ Bucket=test_bucket
+ )
+ print response
+
+ print "test delete bucket lifecycle " + test_bucket
+ response = client.delete_bucket_lifecycle(
+ Bucket=test_bucket
+ )
+
+ print "test put bucket versioning " + test_bucket
+ response = client.put_bucket_versioning(
+ Bucket=test_bucket,
+ Status='Enabled'
+ )
+
+ print "test get bucket versioning " + test_bucket
+ response = client.get_bucket_versioning(
+ Bucket=test_bucket
+ )
+ print response
+
+ print "test get bucket location " + test_bucket
+ response = client.get_bucket_location(
+ Bucket=test_bucket
+ )
+ print response
+
+ print "test head bucket " + test_bucket
+ response = client.head_bucket(
+ Bucket=test_bucket
+ )
+
print "Test Get Presigned Download URL "
url = client.get_presigned_download_url(
Bucket=test_bucket,
@@ -60,9 +138,8 @@ def Test():
print "Test List Buckets"
response = client.list_buckets()
- copy_source = {'Bucket': 'test01', 'Key': '/test.txt'}
- print "Test Copy Object From Other Bucket "
-
+ copy_source = {'Appid': '1252448703', 'Bucket': 'test01', 'Key': '/test.txt', 'Region': 'ap-beijing-1'}
+ print "Test Copy Object From Other Object"
response = client.copy_object(
Bucket='test04',
Key='test.txt',
diff --git a/requirements.txt b/requirements.txt
index bc29d3d8..1cbfa8ca 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,2 @@
-requests==2.12.4
-argparse==1.4.0
+requests
+dicttoxml
diff --git a/setup.py b/setup.py
index 042d9e74..38bb8dbf 100644
--- a/setup.py
+++ b/setup.py
@@ -6,11 +6,6 @@ def requirements():
with open('requirements.txt', 'r') as fileobj:
requirements = [line.strip() for line in fileobj]
-
- version = python_version_tuple()
-
- if version[0] == 2 and version[1] == 6:
- requirements.append("argparse==1.4.0")
return requirements