Skip to content

Commit

Permalink
Simplify interfaces, update benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
taras committed Aug 18, 2024
1 parent 5af87a1 commit a6e6856
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 32 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Connects to an echo server, sends a message and disconnect upon reply.
async def main(endpoint):
(_, client) = await ws_connect(endpoint, ClientListener, "client")
(_, client) = await ws_connect(endpoint, ClientListener)
await client.transport.wait_until_closed()
Expand Down Expand Up @@ -108,7 +108,7 @@ Echo server
async def main():
url = "ws://127.0.0.1:9001"
server = await ws_create_server(url, ServerClientListener, "server")
server = await ws_create_server(url, ServerClientListener)
print(f"Server started on {url}")
await server.serve_forever()
Expand Down
Binary file modified docs/source/_static/picows_benchmark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 6 additions & 5 deletions examples/echo_client_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def on_ws_frame(self, transport: WSTransport, frame: WSFrame):
else:
self._transport.send(WSMsgType.BINARY, msg)

(_, client) = await ws_connect(endpoint, PicowsClientListener, "client", ssl=ssl_context)
(_, client) = await ws_connect(endpoint, PicowsClientListener, ssl=ssl_context)
await client._transport.wait_until_closed()


Expand Down Expand Up @@ -133,13 +133,14 @@ async def aiohttp_main(url: str, data: bytes, duration: int, ssl_context) -> Non
if os.name != 'nt':
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop_name = "uvloop"
loop_name = f"uvloop({uvloop.__version__})"

ssl_context = create_client_ssl_context() if args.url.startswith("wss://") else None

try:
from examples.picows_client_cython import picows_main_cython
asyncio.run(picows_main_cython(args.url, msg, duration, ssl_context))
from examples.echo_client_cython import picows_main_cython
picows_cython_rps = asyncio.run(picows_main_cython(args.url, msg, duration, ssl_context))
RPS["picows(cython)"] = picows_cython_rps
except ImportError:
pass

Expand All @@ -157,7 +158,7 @@ async def aiohttp_main(url: str, data: bytes, duration: int, ssl_context) -> Non

libraries = list(RPS.keys())
counts = list(RPS.values())
bar_colors = ['tab:blue', 'tab:green', 'tab:orange', 'tab:red']
bar_colors = ['tab:blue', 'tab:green', 'tab:green', 'tab:orange', 'tab:red']

ax.bar(libraries, counts, label=libraries, color=bar_colors)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ cdef double get_now_timestamp() except -1.0:
return <double>tspec.tv_sec + <double>tspec.tv_nsec * 1e-9


cdef class PicowsClientListener(WSListener):
cdef class EchoClientListener(WSListener):
cdef:
WSTransport _transport
double _begin_time
int _duration
int _cnt
bytes _data
bytearray _full_reply
readonly int rps

def __init__(self, bytes data, int duration):
super().__init__()
Expand All @@ -45,6 +46,7 @@ cdef class PicowsClientListener(WSListener):
self._cnt = 0
self._data = data
self._full_reply = bytearray()
self.rps = 0

cpdef on_ws_connected(self, WSTransport transport):
self._transport = transport
Expand All @@ -55,11 +57,7 @@ cdef class PicowsClientListener(WSListener):
if frame.fin:
if self._full_reply:
self._full_reply += frame.get_payload_as_memoryview()
# assert self._full_msg == msg
self._full_reply.clear()
else:
# assert frame.get_payload_as_bytes() == msg
pass
else:
self._full_reply += frame.get_payload_as_memoryview()
return
Expand All @@ -68,13 +66,14 @@ cdef class PicowsClientListener(WSListener):
cdef double ts = get_now_timestamp()

if ts - self._begin_time >= self._duration:
print("picows(cython)", int(self._cnt / self._duration))
self.rps = int(self._cnt / self._duration)
self._transport.disconnect()
else:
self._transport.send(WSMsgType.BINARY, self._data)


async def picows_main_cython(url: str, data: bytes, duration: int, ssl_context):
cdef PicowsClientListener client
(_, client) = await ws_connect(url, lambda: PicowsClientListener(data, duration), "client", ssl=ssl_context)
cdef EchoClientListener client
(_, client) = await ws_connect(url, lambda: EchoClientListener(data, duration), ssl=ssl_context)
await client._transport.wait_until_closed()
return client.rps
4 changes: 2 additions & 2 deletions examples/echo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def async_main():
url = "ws://127.0.0.1:9001"
url_ssl = "wss://127.0.0.1:9002"

plain_server = await ws_create_server(url, PicowsServerListener, "server",
plain_server = await ws_create_server(url, PicowsServerListener,
websocket_handshake_timeout=0.5)
_logger.info("Server started on %s", url)

Expand All @@ -34,7 +34,7 @@ async def async_main():
ssl_context.check_hostname = False
ssl_context.hostname_checks_common_name = False
ssl_context.verify_mode = ssl.CERT_NONE
ssl_server = await ws_create_server(url_ssl, PicowsServerListener, "server",
ssl_server = await ws_create_server(url_ssl, PicowsServerListener,
ssl_context=ssl_context,
websocket_handshake_timeout=0.5)
_logger.info("Server started on %s", url_ssl)
Expand Down
20 changes: 10 additions & 10 deletions picows/picows.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -996,20 +996,18 @@ cdef class WSProtocol:

async def ws_connect(str url: str,
ws_listener_factory: Callable[[], WSListener],
str logger_name: str,
ssl: Optional[Union[bool, SSLContext]]=None,
bint disconnect_on_exception: bool=True,
ssl_handshake_timeout=5,
ssl_shutdown_timeout=5,
websocket_handshake_timeout=5,
local_addr: Optional[Tuple[str, int]]=None,
logger_name: str="client"
) -> Tuple[WSTransport, WSListener]:
"""
:param url: Destination URL
:param ws_listener_factory:
A parameterless factory function that returns a user handler. User handler has to derive from :any:`WSListener`.
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:param ssl: optional SSLContext to override default one when wss scheme is used
:param disconnect_on_exception:
Indicates whether the client should initiate disconnect on any exception
Expand All @@ -1023,6 +1021,8 @@ async def ws_connect(str url: str,
:param local_addr:
if given, is a (local_host, local_port) tuple used to bind the socket locally. The local_host and local_port
are looked up using getaddrinfo(), similarly to host and port from url.
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:return: :any:`WSTransport` object and a user handler returned by `ws_listener_factory()'
Open a websocket connection to a given URL.
Expand Down Expand Up @@ -1060,15 +1060,15 @@ async def ws_connect(str url: str,


async def ws_create_server(str url,
ws_listener_factory,
str logger_name,
ssl_context=None,
disconnect_on_exception=True,
ws_listener_factory: Callable[[], WSListener],
ssl_context: Optional[SSLContext]=None,
bint disconnect_on_exception: bool=True,
ssl_handshake_timeout=5,
ssl_shutdown_timeout=5,
websocket_handshake_timeout=5,
reuse_port: bool=None,
start_serving: bool=False
str logger_name: str="server",
start_serving: bool=False,
) -> asyncio.Server:
"""
:param url:
Expand All @@ -1077,8 +1077,6 @@ async def ws_create_server(str url,
:param ws_listener_factory:
A parameterless factory function that returns a user handler for a newly accepted connection.
User handler has to derive from :any:`WSListener`.
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:param ssl: optional SSLContext to override default one when wss scheme is used
:param disconnect_on_exception:
Indicates whether the client should initiate disconnect on any exception
Expand All @@ -1092,6 +1090,8 @@ async def ws_create_server(str url,
:param reuse_port:
tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to,
so long as they all set this flag when being created. This option is not supported on Windows
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:param start_serving:
causes the created server to start accepting connections immediately. When set to False,
the user should await on `Server.start_serving()` or `Server.serve_forever()` to make the server to start
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
]

if os.getenv("PICOWS_BUILD_EXAMPLES") is not None:
cython_modules.append(Extension("examples.picows_client_cython", ["examples/picows_client_cython.pyx"]))
cython_modules.append(Extension("examples.echo_client_cython", ["examples/echo_client_cython.pyx"]))

setup(
ext_modules=cythonize(
Expand Down
8 changes: 4 additions & 4 deletions tests/test_echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame):
self._transport.send_close(frame.get_close_code(), frame.get_close_message())
self._transport.disconnect()

server = await picows.ws_create_server(request.param, PicowsServerListener, "server",
server = await picows.ws_create_server(request.param, PicowsServerListener,
ssl_context=create_server_ssl_context(),
websocket_handshake_timeout=0.5)
task = asyncio.create_task(server.serve_forever())
Expand Down Expand Up @@ -103,7 +103,7 @@ async def get_message(self):
async with async_timeout.timeout(1):
return await self.msg_queue.get()

(_, client) = await picows.ws_connect(echo_server, PicowsClientListener, "client",
(_, client) = await picows.ws_connect(echo_server, PicowsClientListener,
ssl=create_client_ssl_context(),
websocket_handshake_timeout=0.5)
yield client
Expand Down Expand Up @@ -146,13 +146,13 @@ async def test_close(echo_client):
async def test_client_handshake_timeout(echo_server):
# Set unreasonably small timeout
with pytest.raises(TimeoutError):
(_, client) = await picows.ws_connect(echo_server, picows.WSListener, "client",
(_, client) = await picows.ws_connect(echo_server, picows.WSListener,
ssl=create_client_ssl_context(),
websocket_handshake_timeout=0.00001)


async def test_server_handshake_timeout():
server = await picows.ws_create_server(URL, picows.WSListener, "server", websocket_handshake_timeout=0.1)
server = await picows.ws_create_server(URL, picows.WSListener, websocket_handshake_timeout=0.1)
server_task = asyncio.create_task(server.serve_forever())

try:
Expand Down

0 comments on commit a6e6856

Please sign in to comment.