Skip to content

Commit

Permalink
Minor optimizations, test ping/pong
Browse files Browse the repository at this point in the history
  • Loading branch information
taras committed Aug 23, 2024
1 parent 64b791b commit e796d30
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 117 deletions.
5 changes: 1 addition & 4 deletions picows/picows.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ cdef class WSTransport:
MemoryBuffer _write_buf
bint _is_client_side

cdef send_reuse_external_buffer(self, WSMsgType msg_type, char* message, size_t message_size, bint fin=*, bint rsv1=*)
cdef send_reuse_external_buffer(self, WSMsgType msg_type, char* msg_ptr, size_t msg_size, bint fin=*, bint rsv1=*)
cpdef send(self, WSMsgType msg_type, message, bint fin=*, bint rsv1=*)
cpdef send_ping(self, message=*)
cpdef send_pong(self, message=*)
Expand All @@ -101,9 +101,6 @@ cdef class WSTransport:
cdef _send_internal_server_error(self, str error)
cdef _mark_disconnected(self)

cdef bytes _prepare_frame_in_external_buffer(self, WSMsgType msg_type, uint8_t* msg_ptr, size_t msg_length, bint fin, bint rsv1)
cdef bytes _prepare_frame(self, WSMsgType msg_type, message, bint fin, bint rsv1)


cdef class WSListener:
cpdef on_ws_connected(self, WSTransport transport)
Expand Down
217 changes: 105 additions & 112 deletions picows/picows.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,47 @@ cdef class WSTransport:
self._is_client_side = is_client_side

cdef send_reuse_external_buffer(self, WSMsgType msg_type,
char* message, size_t message_size,
char* msg_ptr, size_t msg_size,
bint fin=True, bint rsv1=False):
frame = self._prepare_frame_in_external_buffer(msg_type, <uint8_t*>message, message_size, fin, rsv1)
self.underlying_transport.write(frame)
cdef:
uint8_t* header_ptr = <uint8_t*>msg_ptr
uint64_t extended_payload_length_64
uint32_t mask = <uint32_t> rand() if self._is_client_side else 0
uint16_t extended_payload_length_16
uint8_t first_byte = <uint8_t>msg_type
uint8_t second_byte = 0x80 if self._is_client_side else 0
cdef Py_ssize_t total_size = msg_size

if fin:
first_byte |= 0x80

if rsv1:
first_byte |= 0x40

if msg_size < 126:
total_size += 2
header_ptr -= 2
header_ptr[0] = first_byte
header_ptr[1] = second_byte | <uint8_t>msg_size
elif msg_size < (1 << 16):
total_size += 4
header_ptr -= 4
header_ptr[0] = first_byte
header_ptr[1] = second_byte | 126
extended_payload_length_16 = htons(<uint16_t>msg_size)
(<uint16_t*>(header_ptr + 2))[0] = extended_payload_length_16
else:
total_size += 10
header_ptr -= 10
header_ptr[0] = first_byte
header_ptr[1] = second_byte | 127
extended_payload_length_64 = htobe64(<uint64_t>msg_size)
(<uint64_t*> (header_ptr + 2))[0] = extended_payload_length_64

if self._is_client_side:
_mask_payload(<uint8_t*>msg_ptr, msg_size, mask)

self.underlying_transport.write(PyBytes_FromStringAndSize(<char*>header_ptr, total_size))

cpdef send(self, WSMsgType msg_type, message, bint fin=True, bint rsv1=False):
"""
Expand All @@ -343,8 +380,71 @@ cdef class WSTransport:
Some protocol extensions use it to indicate that payload
is compressed.
"""
frame = self._prepare_frame(msg_type, message, fin, rsv1)
self.underlying_transport.write(frame)
cdef:
Py_buffer msg_buffer
char* msg_ptr
Py_ssize_t msg_length

if message is None:
msg_ptr = b""
msg_length = 0
elif PyBytes_CheckExact(message):
msg_ptr = PyBytes_AS_STRING(message)
msg_length = PyBytes_GET_SIZE(message)
elif PyByteArray_CheckExact(message):
msg_ptr = PyByteArray_AS_STRING(message)
msg_length = PyByteArray_GET_SIZE(message)
else:
PyObject_GetBuffer(message, &msg_buffer, PyBUF_SIMPLE)
msg_ptr = <char*>msg_buffer.buf
msg_length = msg_buffer.len
# We can already release because we still keep the reference to the message
PyBuffer_Release(&msg_buffer)

cdef:
uint8_t first_byte = <uint8_t>msg_type
uint8_t second_byte = 0x80 if self._is_client_side else 0
uint32_t mask = <uint32_t>rand() if self._is_client_side else 0
uint16_t extended_payload_length_16
uint64_t extended_payload_length_64
Py_ssize_t payload_start_idx

if fin:
first_byte |= 0x80

if rsv1:
first_byte |= 0x40

self._write_buf.clear()
self._write_buf.push_back(first_byte)

if msg_length < 126:
second_byte |= <uint8_t>msg_length
self._write_buf.push_back(second_byte)
elif msg_length < (1 << 16):
second_byte |= 126
self._write_buf.push_back(second_byte)
extended_payload_length_16 = htons(<uint16_t>msg_length)
self._write_buf.append(<const char*>&extended_payload_length_16, 2)
else:
second_byte |= 127
extended_payload_length_64 = htobe64(<uint64_t>msg_length)
self._write_buf.push_back(second_byte)
self._write_buf.append(<const char*>&extended_payload_length_64, 8)

if self._is_client_side:
self._write_buf.append(<const char*>&mask, 4)
payload_start_idx = self._write_buf.size
self._write_buf.append(msg_ptr, msg_length)
_mask_payload(<uint8_t*>self._write_buf.data + payload_start_idx, msg_length, mask)
else:
self._write_buf.append(msg_ptr, msg_length)

# Unfortunately we have to make a new bytes object here.
# PyMemoryView_FromMemory can't be used because uvloop.Transport.write
# may delay sending and it doesn't copy the content of the buffer

self.underlying_transport.write(PyBytes_FromStringAndSize(self._write_buf.data, self._write_buf.size))

cpdef send_ping(self, message=None):
"""
Expand Down Expand Up @@ -460,113 +560,6 @@ cdef class WSTransport:
if not self._disconnected_future.done():
self._disconnected_future.set_result(None)

cdef bytes _prepare_frame_in_external_buffer(self, WSMsgType msg_type, uint8_t* msg_ptr, size_t msg_length, bint fin, bint rsv1):
cdef:
uint8_t* header_ptr = msg_ptr
uint64_t extended_payload_length_64
uint32_t mask = <uint32_t> rand() if self._is_client_side else 0
uint16_t extended_payload_length_16
uint8_t first_byte = <uint8_t> msg_type
uint8_t second_byte = 0x80 if self._is_client_side else 0

if fin:
first_byte |= 0x80

if rsv1:
first_byte |= 0x40

if msg_length < 126:
header_ptr -= 2
header_ptr[0] = first_byte
header_ptr[1] = second_byte | <uint8_t>msg_length
elif msg_length < (1 << 16):
header_ptr -= 4
header_ptr[0] = first_byte
header_ptr[1] = second_byte | 126
extended_payload_length_16 = htons(<uint16_t> msg_length)
(<uint16_t*>(header_ptr + 2))[0] = extended_payload_length_16
else:
header_ptr -= 10
header_ptr[0] = first_byte
header_ptr[1] = second_byte | 127
extended_payload_length_64 = htobe64(<uint64_t> msg_length)
(<uint64_t*> (header_ptr + 2))[0] = extended_payload_length_64

if self._is_client_side:
_mask_payload(msg_ptr, msg_length, mask)

cdef Py_ssize_t total_length = msg_length + (msg_ptr - header_ptr)

return PyBytes_FromStringAndSize(<char*>header_ptr, total_length)

cdef bytes _prepare_frame(self, WSMsgType msg_type, message, bint fin, bint rsv1):
"""Send a frame over the websocket with message as its payload."""
cdef:
Py_buffer msg_buffer
char* msg_ptr
Py_ssize_t msg_length

if message is None:
msg_ptr = b""
msg_length = 0
elif PyBytes_CheckExact(message):
msg_ptr = PyBytes_AS_STRING(message)
msg_length = PyBytes_GET_SIZE(message)
elif PyByteArray_CheckExact(message):
msg_ptr = PyByteArray_AS_STRING(message)
msg_length = PyByteArray_GET_SIZE(message)
else:
PyObject_GetBuffer(message, &msg_buffer, PyBUF_SIMPLE)
msg_ptr = <char*>msg_buffer.buf
msg_length = msg_buffer.len
# We can already release because we still keep the reference to the message
PyBuffer_Release(&msg_buffer)

cdef:
uint8_t first_byte = <uint8_t>msg_type
uint8_t second_byte = 0x80 if self._is_client_side else 0
uint32_t mask = <uint32_t>rand() if self._is_client_side else 0
uint16_t extended_payload_length_16
uint64_t extended_payload_length_64
Py_ssize_t payload_start_idx

if fin:
first_byte |= 0x80

if rsv1:
first_byte |= 0x40

self._write_buf.clear()
self._write_buf.push_back(first_byte)

if msg_length < 126:
second_byte |= <uint8_t>msg_length
self._write_buf.push_back(second_byte)
elif msg_length < (1 << 16):
second_byte |= 126
self._write_buf.push_back(second_byte)
extended_payload_length_16 = htons(<uint16_t>msg_length)
self._write_buf.append(<const char*>&extended_payload_length_16, 2)
else:
second_byte |= 127
extended_payload_length_64 = htobe64(<uint64_t>msg_length)
self._write_buf.push_back(second_byte)
self._write_buf.append(<const char*>&extended_payload_length_64, 8)

if self._is_client_side:
self._write_buf.append(<const char*>&mask, 4)
payload_start_idx = self._write_buf.size
self._write_buf.append(msg_ptr, msg_length)
_mask_payload(<uint8_t*>self._write_buf.data + payload_start_idx, msg_length, mask)
else:
self._write_buf.append(msg_ptr, msg_length)

# Unfortunately we can't return a memoryview from write buffer like this
# return PyMemoryView_FromMemory(<char *> &self._write_buf[0], self._write_buf.size(), PyBUF_READ)
# because uvloop.Transport.write may delay sending and it doesn't copy the content of the buffer

return PyBytes_FromStringAndSize(self._write_buf.data, self._write_buf.size)


cdef class WSProtocol:
cdef:
Expand Down
16 changes: 15 additions & 1 deletion tests/test_echo.py → tests/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def get_message(self):
client.transport.disconnect()


@pytest.mark.parametrize("msg_size", [256, 256 * 1024])
@pytest.mark.parametrize("msg_size", [64, 256 * 1024])
async def test_echo(echo_client, msg_size):
msg = os.urandom(msg_size)
echo_client.transport.send(picows.WSMsgType.BINARY, msg, False, False)
Expand Down Expand Up @@ -161,6 +161,20 @@ async def test_echo(echo_client, msg_size):
assert frame.fin
assert not frame.rsv1

# Check ping
echo_client.transport.send_ping(b"hi")
async with async_timeout.timeout(TIMEOUT):
frame = await echo_client.get_message()
assert frame.msg_type == picows.WSMsgType.PING
assert frame.payload_as_bytes == b"hi"

# Check pong
echo_client.transport.send_pong(b"hi")
async with async_timeout.timeout(TIMEOUT):
frame = await echo_client.get_message()
assert frame.msg_type == picows.WSMsgType.PONG
assert frame.payload_as_bytes == b"hi"


async def test_close(echo_client):
echo_client.transport.send_close(picows.WSCloseCode.GOING_AWAY, b"goodbye")
Expand Down

0 comments on commit e796d30

Please sign in to comment.