Skip to content

Commit

Permalink
drop parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Kim committed Feb 14, 2017
1 parent ee234db commit 6d3866e
Show file tree
Hide file tree
Showing 28 changed files with 666 additions and 2,337 deletions.
2 changes: 0 additions & 2 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from .client_reqrep import * # noqa
from .errors import * # noqa
from .helpers import * # noqa
from .parsers import * # noqa
from .streams import * # noqa
from .multipart import * # noqa
from .client_ws import ClientWebSocketResponse # noqa
Expand All @@ -30,7 +29,6 @@
client_reqrep.__all__ + # noqa
errors.__all__ + # noqa
helpers.__all__ + # noqa
parsers.__all__ + # noqa
protocol.__all__ + # noqa
connector.__all__ + # noqa
streams.__all__ + # noqa
Expand Down
187 changes: 15 additions & 172 deletions aiohttp/_ws_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from aiohttp import errors, hdrs
from aiohttp.log import ws_logger

__all__ = ('WebSocketParser', 'WebSocketWriter', 'do_handshake',
__all__ = ('WebSocketReader', 'WebSocketWriter', 'do_handshake',
'WSMessage', 'WebSocketError', 'WSMsgType', 'WSCloseCode')


Expand Down Expand Up @@ -102,107 +102,6 @@ def __init__(self, code, message):
super().__init__(message)


def WebSocketParser(out, buf):
while True:
fin, opcode, payload = yield from parse_frame(buf)

if opcode == WSMsgType.CLOSE:
if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Invalid close code: {}'.format(close_code))
try:
close_message = payload[2:].decode('utf-8')
except UnicodeDecodeError as exc:
raise WebSocketError(
WSCloseCode.INVALID_TEXT,
'Invalid UTF-8 text message') from exc
msg = WSMessage(WSMsgType.CLOSE, close_code, close_message)
elif payload:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Invalid close frame: {} {} {!r}'.format(
fin, opcode, payload))
else:
msg = WSMessage(WSMsgType.CLOSE, 0, '')

out.feed_data(msg, 0)

elif opcode == WSMsgType.PING:
out.feed_data(WSMessage(WSMsgType.PING, payload, ''), len(payload))

elif opcode == WSMsgType.PONG:
out.feed_data(WSMessage(WSMsgType.PONG, payload, ''), len(payload))

elif opcode not in (WSMsgType.TEXT, WSMsgType.BINARY):
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
"Unexpected opcode={!r}".format(opcode))
else:
# load text/binary
data = [payload]

while not fin:
fin, _opcode, payload = yield from parse_frame(buf, True)

# We can receive ping/close in the middle of
# text message, Case 5.*
if _opcode == WSMsgType.PING:
out.feed_data(
WSMessage(WSMsgType.PING, payload, ''), len(payload))
fin, _opcode, payload = yield from parse_frame(buf, True)
elif _opcode == WSMsgType.CLOSE:
if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
if (close_code not in ALLOWED_CLOSE_CODES and
close_code < 3000):
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Invalid close code: {}'.format(close_code))
try:
close_message = payload[2:].decode('utf-8')
except UnicodeDecodeError as exc:
raise WebSocketError(
WSCloseCode.INVALID_TEXT,
'Invalid UTF-8 text message') from exc
msg = WSMessage(WSMsgType.CLOSE, close_code,
close_message)
elif payload:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Invalid close frame: {} {} {!r}'.format(
fin, opcode, payload))
else:
msg = WSMessage(WSMsgType.CLOSE, 0, '')

out.feed_data(msg, 0)
fin, _opcode, payload = yield from parse_frame(buf, True)

if _opcode != WSMsgType.CONTINUATION:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'The opcode in non-fin frame is expected '
'to be zero, got {!r}'.format(_opcode))
else:
data.append(payload)

if opcode == WSMsgType.TEXT:
try:
text = b''.join(data).decode('utf-8')
out.feed_data(WSMessage(WSMsgType.TEXT, text, ''),
len(text))
except UnicodeDecodeError as exc:
raise WebSocketError(
WSCloseCode.INVALID_TEXT,
'Invalid UTF-8 text message') from exc
else:
data = b''.join(data)
out.feed_data(
WSMessage(WSMsgType.BINARY, data, ''), len(data))


native_byteorder = sys.byteorder


Expand Down Expand Up @@ -240,70 +139,6 @@ def _websocket_mask_python(mask, data):
_websocket_mask = _websocket_mask_python


def parse_frame(buf, continuation=False):
"""Return the next frame from the socket."""
# read header
data = yield from buf.read(2)
first_byte, second_byte = data

fin = (first_byte >> 7) & 1
rsv1 = (first_byte >> 6) & 1
rsv2 = (first_byte >> 5) & 1
rsv3 = (first_byte >> 4) & 1
opcode = first_byte & 0xf

# frame-fin = %x0 ; more frames of this message follow
# / %x1 ; final frame of this message
# frame-rsv1 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
# frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
# frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
if rsv1 or rsv2 or rsv3:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Received frame with non-zero reserved bits')

if opcode > 0x7 and fin == 0:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Received fragmented control frame')

if fin == 0 and opcode == WSMsgType.CONTINUATION and not continuation:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
'Received new fragment frame with non-zero '
'opcode {!r}'.format(opcode))

has_mask = (second_byte >> 7) & 1
length = (second_byte) & 0x7f

# Control frames MUST have a payload length of 125 bytes or less
if opcode > 0x7 and length > 125:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
"Control frame payload cannot be larger than 125 bytes")

# read payload
if length == 126:
data = yield from buf.read(2)
length = UNPACK_LEN2(data)[0]
elif length > 126:
data = yield from buf.read(8)
length = UNPACK_LEN3(data)[0]

if has_mask:
mask = yield from buf.read(4)

if length:
payload = yield from buf.read(length)
else:
payload = bytearray()

if has_mask:
payload = _websocket_mask(bytes(mask), payload)

return fin, opcode, payload


class WSParserState(IntEnum):
READ_HEADER = 1
READ_PAYLOAD_LENGTH = 2
Expand All @@ -320,6 +155,7 @@ def __init__(self, queue):
self._partial = []
self._state = WSParserState.READ_HEADER

self._opcode = None
self._frame_fin = False
self._frame_opcode = None
self._frame_payload = bytearray()
Expand All @@ -346,7 +182,6 @@ def feed_data(self, data):

def _feed_data(self, data):
for fin, opcode, payload in self.parse_frame(data):

if opcode == WSMsgType.CLOSE:
if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
Expand Down Expand Up @@ -380,7 +215,8 @@ def _feed_data(self, data):
self.queue.feed_data(
WSMessage(WSMsgType.PONG, payload, ''), len(payload))

elif opcode not in (WSMsgType.TEXT, WSMsgType.BINARY):
elif opcode not in (
WSMsgType.TEXT, WSMsgType.BINARY) and not self._opcode:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
"Unexpected opcode={!r}".format(opcode))
Expand All @@ -389,6 +225,8 @@ def _feed_data(self, data):

if not fin:
# got partial frame payload
if opcode != WSMsgType.CONTINUATION:
self._opcode = opcode
self._partial.append(payload)
else:
# previous frame was non finished
Expand All @@ -400,6 +238,9 @@ def _feed_data(self, data):
'The opcode in non-fin frame is expected '
'to be zero, got {!r}'.format(opcode))

if opcode == WSMsgType.CONTINUATION:
opcode = self._opcode

self._partial.append(payload)

if opcode == WSMsgType.TEXT:
Expand All @@ -416,6 +257,7 @@ def _feed_data(self, data):
self.queue.feed_data(
WSMessage(WSMsgType.BINARY, data, ''), len(data))

self._start_opcode = None
self._partial.clear()

return False, b''
Expand Down Expand Up @@ -565,9 +407,10 @@ def parse_frame(self, buf, continuation=False):

class WebSocketWriter:

def __init__(self, writer, *,
def __init__(self, stream, *,
use_mask=False, limit=DEFAULT_LIMIT, random=random.Random()):
self.writer = writer
self.stream = stream
self.writer = stream.transport
self.use_mask = use_mask
self.randrange = random.randrange
self._closing = False
Expand Down Expand Up @@ -610,7 +453,7 @@ def _send_frame(self, message, opcode):

if self._output_size > self._limit:
self._output_size = 0
return self.writer.drain()
return self.stream.drain()

return ()

Expand Down Expand Up @@ -720,6 +563,6 @@ def do_handshake(method, headers, transport,
# response code, headers, parser, writer, protocol
return (101,
response_headers,
WebSocketParser,
None,
WebSocketWriter(transport, limit=write_buffer_size),
protocol)
21 changes: 12 additions & 9 deletions aiohttp/client_proto.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import asyncio
import asyncio.streams
import socket

from . import errors, hdrs, streams
from . import errors, hdrs
from .errors import ServerDisconnectedError
from .streams import DataQueue, FlowControlStreamReader, EmptyStreamReader
from .parsers import StreamParser, StreamWriter
from .protocol import HttpResponseParser, HttpPayloadParser
from .protocol import HttpPayloadParser, HttpResponseParser
from .streams import (DataQueue, EmptyStreamReader, FlowControlStreamReader,
StreamWriter)

EMPTY_PAYLOAD = EmptyStreamReader()

Expand Down Expand Up @@ -47,7 +46,7 @@ def is_connected(self):

def connection_made(self, transport):
self.transport = transport
self.writer = StreamWriter(transport, self, None, self._loop)
self.writer = StreamWriter(self, transport, self._loop)

def connection_lost(self, exc):
self.transport = self.writer = None
Expand Down Expand Up @@ -149,17 +148,21 @@ def data_received(self, data,

# calculate payload
empty_payload = True
if (((length is not None and length > 0) or msg.chunked) and
if (((length is not None and length > 0) or
msg.chunked) and
(not self._skip_payload and
msg.code not in self._skip_status_codes)):

if not msg.upgrade:
payload = FlowControlStreamReader(
self, timer=self._timer, loop=self._loop)
payload_parser = HttpPayloadParser(
msg, readall=self._read_until_eof)
payload, length=length,
chunked=msg.chunked, code=msg.code,
compression=msg.compression,
readall=self._read_until_eof)

if payload_parser.start(length, payload):
if not payload_parser.done:
empty_payload = False
self._payload = payload
self._payload_parser = payload_parser
Expand Down
2 changes: 0 additions & 2 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,6 @@ def release(self, *, consume=False):
self._closed = True
if self._connection is not None:
self._connection.release()
#if self._reader is not None:
#self._reader.unset_parser()
self._connection = None
self._cleanup_writer()
self._notify_content()
Expand Down
5 changes: 1 addition & 4 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@
from hashlib import md5, sha1, sha256
from types import MappingProxyType

import aiohttp

from . import hdrs, helpers
from .client import ClientRequest
from .client_proto import HttpClientProtocol
from .errors import (ClientOSError, ClientTimeoutError, FingerprintMismatch,
HttpProxyError, ProxyConnectionError,
ServerDisconnectedError)
HttpProxyError, ProxyConnectionError)
from .helpers import SimpleCookie, is_ip_address, sentinel
from .resolver import DefaultResolver

Expand Down
Loading

0 comments on commit 6d3866e

Please sign in to comment.