Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 #112

Merged
merged 8 commits into from
Mar 23, 2020
Merged

S3 #112

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import base64
import os
import sys
import time
import copy
import json
import xml.dom.minidom
Expand All @@ -23,15 +24,15 @@
from .cos_exception import CosClientError
from .cos_exception import CosServiceError
from .version import __version__

from .select_event_stream import EventStream
logger = logging.getLogger(__name__)


class CosConfig(object):
"""config类,保存用户相关信息"""
def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token=None, Scheme=None, Timeout=None,
Access_id=None, Access_key=None, Secret_id=None, Secret_key=None, Endpoint=None, IP=None, Port=None,
Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None):
Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None, PoolConnections=10, PoolMaxSize=10):
"""初始化,保存用户的信息

:param Appid(string): 用户APPID.
Expand All @@ -53,6 +54,8 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token
:param Proxies(dict): 使用代理来访问COS
:param Domain(string): 使用自定义的域名来访问COS
:param ServiceDomain(string): 使用自定义的域名来访问cos service
:param PoolConnections(int): 连接池个数
:param PoolMaxSize(int): 连接池中最大连接数
"""
self._appid = to_unicode(Appid)
self._token = to_unicode(Token)
Expand All @@ -66,6 +69,8 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token
self._proxies = Proxies
self._domain = Domain
self._service_domain = ServiceDomain
self._pool_connections = PoolConnections
self._pool_maxsize = PoolMaxSize

if self._domain is None:
self._endpoint = format_endpoint(Endpoint, Region)
Expand Down Expand Up @@ -175,6 +180,8 @@ def __init__(self, conf, retry=1, session=None):
self._retry = retry # 重试的次数,分片上传时可适当增大
if session is None:
self._session = requests.session()
self._session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=self._conf._pool_connections, pool_maxsize=self._conf._pool_maxsize))
self._session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=self._conf._pool_connections, pool_maxsize=self._conf._pool_maxsize))
else:
self._session = session

Expand Down Expand Up @@ -235,6 +242,8 @@ def send_request(self, method, url, bucket, timeout=30, **kwargs):
kwargs['verify'] = False
for j in range(self._retry + 1):
try:
if j != 0:
time.sleep(j)
if method == 'POST':
res = self._session.post(url, timeout=timeout, proxies=self._conf._proxies, **kwargs)
elif method == 'GET':
Expand Down Expand Up @@ -949,7 +958,7 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs):

:param Bucket(string): 存储桶名称.
:param Key(string): COS路径.
:param RestoreRequest: 取回object的属性设置
:param RestoreRequest(dict): 取回object的属性设置
:param kwargs(dict): 设置请求headers.
:return: None.
"""
Expand All @@ -973,6 +982,46 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs):
params=params)
return None

def select_object_content(self, Bucket, Key, Expression, ExpressionType, InputSerialization, OutputSerialization, RequestProgress=None, **kwargs):
"""从指定文对象中检索内容

:param Bucket(string): 存储桶名称.
:param Key(string): 检索的路径.
:param Expression(string): 查询语句
:param ExpressionType(string): 查询语句的类型
:param RequestProgress(dict): 查询进度设置
:param InputSerialization(dict): 输入格式设置
:param OutputSerialization(dict): 输出格式设置
:param kwargs(dict): 设置请求headers.
:return(dict): 检索内容.
"""
params = {'select': '', 'select-type': 2}
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=Key)
logger.info("select object content, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
SelectRequest = {
'Expression': Expression,
'ExpressionType': ExpressionType,
'InputSerialization': InputSerialization,
'OutputSerialization': OutputSerialization
}
if RequestProgress is not None:
SelectRequest['RequestProgress'] = RequestProgress
xml_config = format_xml(data=SelectRequest, root='SelectRequest')
rt = self.send_request(
method='POST',
url=url,
stream=True,
bucket=Bucket,
data=xml_config,
auth=CosS3Auth(self._conf, Key, params=params),
headers=headers,
params=params)
data = {'Payload': EventStream(rt)}
return data

# s3 bucket interface begin
def create_bucket(self, Bucket, **kwargs):
"""创建一个bucket
Expand Down Expand Up @@ -2688,9 +2737,9 @@ def list_buckets(self, **kwargs):
)
"""
headers = mapped(kwargs)
url = 'https://service.cos.myqcloud.com/'
url = '{scheme}://service.cos.myqcloud.com/'.format(scheme=self._conf._scheme)
if self._conf._service_domain is not None:
url = 'https://{domain}/'.format(domain=self._conf._service_domain)
url = '{scheme}://{domain}/'.format(scheme=self._conf._scheme, domain=self._conf._service_domain)
rt = self.send_request(
method='GET',
url=url,
Expand Down
3 changes: 2 additions & 1 deletion qcloud_cos/cos_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
'SSECustomerKeyMD5': 'x-cos-server-side-encryption-customer-key-MD5',
'SSEKMSKeyId': 'x-cos-server-side-encryption-cos-kms-key-id',
'Referer': 'Referer',
'PicOperations': 'Pic-Operations'
'PicOperations': 'Pic-Operations',
'TrafficLimit': 'x-cos-traffic-limit',
}


Expand Down
10 changes: 8 additions & 2 deletions qcloud_cos/cos_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

class CosException(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self._message = message

def __str__(self):
return str(self._message)


def digest_xml(data):
Expand Down Expand Up @@ -46,14 +49,17 @@ class CosServiceError(CosException):
"""COS Server端错误,可以获取特定的错误信息"""
def __init__(self, method, message, status_code):
CosException.__init__(self, message)
if method == 'HEAD': # 对HEAD进行特殊处理
if isinstance(message, dict):
self._origin_msg = ''
self._digest_msg = message
else:
self._origin_msg = message
self._digest_msg = digest_xml(message)
self._status_code = status_code

def __str__(self):
return str(self._digest_msg)

def get_origin_msg(self):
"""获取原始的XML格式错误信息"""
return self._origin_msg
Expand Down
97 changes: 97 additions & 0 deletions qcloud_cos/select_event_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding=utf-8
import os
import uuid
import struct
import logging
from .cos_comm import xml_to_dict
from .cos_comm import to_unicode
from .cos_exception import CosServiceError

logger = logging.getLogger(__name__)


class EventStream():
def __init__(self, rt):
self._rt = rt
self._raw = self._rt.raw
self._finish = False

def __iter__(self):
return self

def __next__(self):
return self.next_event()

next = __next__

def next_event(self):
"""获取下一个事件"""
if self._finish:
"""要把剩下的内容读完丢弃或者自己关连接,否则不会自动关连接"""
self._raw.read()
raise StopIteration
total_byte_length = struct.unpack('>I', bytes(self._raw.read(4)))[0] # message总长度
header_byte_length = struct.unpack('>I', bytes(self._raw.read(4)))[0] # header总长度
prelude_crc = struct.unpack('>I', bytes(self._raw.read(4)))[0]
# 处理headers
offset = 0
msg_headers = {}
while offset < header_byte_length:
header_name_length = struct.unpack('>B', bytes(self._raw.read(1)))[0]
header_name = to_unicode(self._raw.read(header_name_length))
header_value_type = struct.unpack('>B', bytes(self._raw.read(1)))[0]
header_value_length = struct.unpack('>H', bytes(self._raw.read(2)))[0]
header_value = to_unicode(self._raw.read(header_value_length))
msg_headers[header_name] = header_value
offset += 4 + header_name_length + header_value_length
# 处理payload(输出给用户的dict中也为bytes)
payload_byte_length = total_byte_length - header_byte_length - 16 # payload总长度
payload = self._raw.read(payload_byte_length)
message_crc = struct.unpack('>I', bytes(self._raw.read(4)))[0]
if ':message-type' in msg_headers and msg_headers[':message-type'] == 'event':
if ':event-type' in msg_headers and msg_headers[':event-type'] == "Records":
return {'Records': {'Payload': payload}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Stats":
return {'Stats': {'Details': xml_to_dict(payload)}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Progress":
return {'Progress': {'Details': xml_to_dict(payload)}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Cont":
return {'Cont': {}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "End":
self._finish = True
return {'End': {}}
# 处理Error Message(抛出异常)
if ':message-type' in msg_headers and msg_headers[':message-type'] == 'error':
error_info = dict()
error_info['code'] = msg_headers[':error-code']
error_info['message'] = msg_headers[':error-message']
error_info['resource'] = self._rt.request.url
error_info['requestid'] = ''
error_info['traceid'] = ''
if 'x-cos-request-id' in self._rt.headers:
error_info['requestid'] = self._rt.headers['x-cos-request-id']
if 'x-cos-trace-id' in self._rt.headers:
error_info['traceid'] = self._rt.headers['x-cos-trace-id']
logger.error(error_info)
e = CosServiceError('POST', error_info, self._rt.status_code)
raise e

def get_select_result(self):
"""获取查询结果"""
data = b""
for event in self:
if 'Records' in event:
data += event['Records']['Payload']
return data

def get_select_result_to_file(self, file_name):
"""保存查询结果到文件"""
tmp_file_name = "{file_name}_{uuid}".format(file_name=file_name, uuid=uuid.uuid4().hex)
with open(tmp_file_name, 'wb') as fp:
for event in self:
if 'Records' in event:
data = event['Records']['Payload']
fp.write(data)
if os.path.exists(file_name):
os.remove(file_name)
os.rename(tmp_file_name, file_name)
7 changes: 7 additions & 0 deletions qcloud_cos/streambody.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ class StreamBody():
def __init__(self, rt):
self._rt = rt

def __iter__(self):
"""提供一个默认的迭代器"""
return self._rt.iter_content(1024)

def get_raw_stream(self):
"""提供原始流"""
return self._rt.raw

def get_stream(self, chunk_size=1024):
"""提供一个chunk可变的迭代器"""
return self._rt.iter_content(chunk_size=chunk_size)

def get_stream_to_file(self, file_name, auto_decompress=False):
"""保存流到本地文件"""
use_chunked = False
if 'Content-Length' in self._rt.headers:
content_len = int(self._rt.headers['Content-Length'])
Expand Down
2 changes: 1 addition & 1 deletion qcloud_cos/version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

__version__ = '5.1.7.6'
__version__ = '5.1.7.8'
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def long_description():

setup(
name='cos-python-sdk-v5',
version='1.7.7',
version='1.7.8',
url='https://www.qcloud.com/',
license='MIT',
author='tiedu, lewzylu, channingliu',
Expand Down
Loading