Skip to content

Commit

Permalink
[WIP] try to avoid using wrapper, send data directly
Browse files Browse the repository at this point in the history
Instead of running an async wrapper for the inter process communication,
why not talk to the mypy daemon directly with unix sockets? Only issue is,
I'm having a bit of trouble doing that at the moment, but I'd like to at least
have this out there.
  • Loading branch information
CoolCat467 committed Nov 12, 2023
1 parent afe58df commit aa03cbf
Showing 1 changed file with 114 additions and 44 deletions.
158 changes: 114 additions & 44 deletions src/idlemypyextension/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@
__title__ = "Mypy Daemon Client"
__license__ = "MIT"

import base64
import contextlib
import io
import json
import os
import time
from collections.abc import Sequence
from typing import TYPE_CHECKING, TypedDict, cast

Expand Down Expand Up @@ -90,25 +89,6 @@ class Response(TypedDict):
stats: NotRequired[object]


async def _receive(connection: _IPCClient) -> Response:
"""Receive JSON data from a connection until EOF.
Raise OSError if the data received is not valid JSON or if it is
not a dict.
"""
async_connection = trio.wrap_file(cast(io.BytesIO, connection))
bdata: bytes = await async_connection.read()
if not bdata:
raise OSError("No data received")
try:
data = json.loads(bdata.decode("utf8"))
except Exception as ex:
raise OSError("Data received is not valid JSON") from ex
if not isinstance(data, dict):
raise OSError(f"Data received is not a dict (got {type(data)})")
return cast(Response, data)


class BadStatusError(Exception):

"""Exception raised when there is something wrong with the status file.
Expand Down Expand Up @@ -174,6 +154,95 @@ def get_status(status_file: str) -> tuple[int, str]:
return _check_status(data)


def _read_request_response_json(request_response: bytes) -> Response:
"""Read request response json."""
try:
data = json.loads(request_response)
except Exception:
return {"error": "Data received is not valid JSON"}
if not isinstance(data, dict):
return {"error": f"Data received is not a dict (got {type(data)})"}
return cast(Response, data)


async def _request_win32(
name: str,
request_arguments: str,
timeout: int | None = None,
) -> Response:
"""Request from daemon on windows."""

async def _receive(
async_connection: trio._file_io.AsyncIOWrapper[_IPCClient],
) -> Response:
"""Receive JSON data from a connection until EOF.
Raise OSError if the data received is not valid JSON or if it is
not a dict.
"""
bdata: bytes = await async_connection.read()
if not bdata:
return {"error": "No data received"}
return _read_request_response_json(bdata)

try:
with _IPCClient(name, timeout) as client:
async_client = trio.wrap_file(client)
await async_client.write(request_arguments.encode("utf8"))
return await _receive(async_client)
except (OSError, _IPCException, ValueError) as err:
return {"error": str(err)}


async def _request_linux(
filename: str,
request_arguments: str,
timeout: int | None = None,
) -> Response:
def find_frame_in_buffer(
buffer: bytearray,
) -> tuple[bytearray, bytearray | None]:
"""Return a full frame from the bytes we have in the buffer."""
space_pos = buffer.find(b" ")
if space_pos == -1:
return buffer, None
# We have a full frame
return buffer[space_pos + 1 :], buffer[:space_pos]

buffer = bytearray()
frame: bytearray | None = None
async with await trio.open_unix_socket(filename) as connection:
# Frame the data by urlencoding it and separating by space.
print(f"{request_arguments = }")
request_frame = (
base64.encodebytes(request_arguments.encode("utf8")) + b" "
)
## request_frame = request_arguments.encode("utf8")
print(f"{request_frame = }")
await connection.send_all(request_frame)

while frame is None:
print("frame read click")
# Receive more data into the buffer.
try:
with trio.fail_after(timeout): # timeout
more = await connection.receive_some()
except trio.TooSlowError:
return {"error": "Connection timed out"}
print(f"{more = }")
if not more:
# Connection closed
# Socket was empty and we didn't get any frame.
return {"error": "No data received"}
buffer.extend(more)

buffer, frame = find_frame_in_buffer(buffer)
print(f"\n{buffer = }")
print(f"{frame = }")
response_text = base64.decodebytes(frame)
return _read_request_response_json(response_text)


async def request(
status_file: str,
command: str,
Expand All @@ -199,15 +268,12 @@ async def request(
# output accordingly.
args["is_tty"] = False
args["terminal_width"] = 80
bdata = json.dumps(args).encode("utf8")
request_arguments = json.dumps(args)
_, name = get_status(status_file)
try:
with _IPCClient(name, timeout) as client:
async_client = trio.wrap_file(cast(io.BytesIO, client))
await async_client.write(bdata)
return await _receive(client)
except (OSError, _IPCException, ValueError) as err:
return {"error": str(err)}

if True: # sys.platform == "win32":
return await _request_win32(name, request_arguments, timeout)
return await _request_linux(name, request_arguments, timeout)


def is_running(status_file: str) -> bool:
Expand All @@ -226,21 +292,24 @@ async def stop(status_file: str) -> Response:

async def _wait_for_server(status_file: str, timeout: float = 5.0) -> bool:
"""Wait until the server is up. Return False if timed out."""
endtime = time.time() + timeout
while time.time() < endtime:
try:
data = _read_status(status_file)
except BadStatusError:
# If the file isn't there yet, retry later.
await trio.sleep(0.1)
continue
# If the file's content is bogus or the process is dead, fail.
try:
_check_status(data)
except BadStatusError:
return False
return True
return False
try:
with trio.fail_after(timeout):
while True:
try:
data = _read_status(status_file)
except BadStatusError:
# If the file isn't there yet, retry later.
await trio.sleep(0.1)
continue
break
except trio.TooSlowError:
return False
# If the file's content is bogus or the process is dead, fail.
try:
_check_status(data)
except BadStatusError:
return False
return True


async def _start_server(
Expand All @@ -253,6 +322,7 @@ async def _start_server(
) -> bool:
"""Start the server and wait for it. Return False if error starting."""
start_options = _process_start_options(flags, allow_sources)
print(f"[Client DEBUG] {start_options = }")
if (
_daemonize(
start_options,
Expand Down

0 comments on commit aa03cbf

Please sign in to comment.