Skip to content

Commit

Permalink
WIP test recovery from BrokenPipeError
Browse files Browse the repository at this point in the history
  • Loading branch information
krassowski committed Mar 19, 2021
1 parent 2564fe7 commit 600c0cb
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 3 deletions.
2 changes: 2 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def init_language_servers(self) -> None:
# copy the language servers before anybody monkeys with them
language_servers_from_config = dict(self.language_servers)
language_servers_from_config.update(self.conf_d_language_servers)
self.log.error(language_servers_from_config)

if self.autodetect:
language_servers.update(self._autodetect_language_servers())
Expand Down Expand Up @@ -215,6 +216,7 @@ def _autodetect_language_servers(self):

try:
entry_points = entrypoints.get_group_named(EP_SPEC_V1)
self.log.debug(f"{entry_points}")
except Exception: # pragma: no cover
self.log.exception("Failed to load entry_points")

Expand Down
44 changes: 42 additions & 2 deletions python_packages/jupyter_lsp/jupyter_lsp/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import string
import subprocess
from copy import copy
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone

from tornado.ioloop import IOLoop
from tornado.queues import Queue
Expand Down Expand Up @@ -49,8 +49,12 @@ class LanguageServerSession(LoggingConfigurable):
status = UseEnum(SessionStatus, default_value=SessionStatus.NOT_STARTED)
last_handler_message_at = Instance(datetime, allow_none=True)
last_server_message_at = Instance(datetime, allow_none=True)
allow_server_failure_not_more_often_than = Instance(
timedelta, allow_none=False, default_value=timedelta(minutes=20)
)

_tasks = None
_last_failure = None

_skip_serialize = ["argv", "debug_argv"]

Expand Down Expand Up @@ -169,7 +173,12 @@ async def _read_lsp(self):
await self.reader.read()

async def _write_lsp(self):
await self.writer.write()
task = self.writer.write()
results = await asyncio.gather(task, return_exceptions=True)
for result in results:
if isinstance(result, BrokenPipeError):
self._handle_server_failure(result)
return results

async def _broadcast_from_lsp(self):
"""loop for reading messages from the queue of messages from the language
Expand All @@ -179,3 +188,34 @@ async def _broadcast_from_lsp(self):
self.last_server_message_at = self.now()
await self.parent.on_server_message(message, self)
self.from_lsp.task_done()

def _handle_server_failure(self, error):
description: str
action: str
now = datetime.now()

allowed = self.allow_server_failure_not_more_often_than
if self._last_failure and now - self._last_failure > allowed:
delta = now - self._last_failure
description = (
f"giving up as the previous failure was {delta} ago"
f" which is less than te minimum allowed interval ({allowed})"
)
action = "raise"
else:
action = "restart"
description = "restarting session..."

text = (
f"Encountered {self.language_server} language server failure;"
f" {description}"
f" (exception: {error})"
f" (faulty process: {self.process})"
)
self.log.warning(text)

if action == "raise":
raise
elif action == "restart":
self.stop()
self.initialize()
7 changes: 6 additions & 1 deletion python_packages/jupyter_lsp/jupyter_lsp/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def _read_content(
if len(raw) != length: # pragma: no cover
self.log.warning(
f"Readout and content-length mismatch: {len(raw)} vs {length};"
f"remaining empties: {max_empties}; remaining parts: {max_parts}"
f" remaining empties: {max_empties}; remaining parts: {max_parts}"
)

return raw
Expand Down Expand Up @@ -191,7 +191,12 @@ async def write(self) -> None:
body = message.encode("utf-8")
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
await convert_yielded(self._write_one(response.encode("utf-8")))
except BrokenPipeError:
self.queue.task_done()
# propagate broken pipe errors
raise
except Exception: # pragma: no cover
# catch other (hopefully mild) exceptions
self.log.exception("%s couldn't write message: %s", self, response)
finally:
self.queue.task_done()
Expand Down
19 changes: 19 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ def jsonrpc_init_msg():
)


@fixture
def did_open_message():
return json.dumps(
{
"id": 0,
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": pathlib.Path(__file__).as_uri(),
"languageId": "python",
"version": 0,
"text": "",
}
},
}
)


@fixture
def app():
return MockServerApp()
Expand Down
48 changes: 48 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
import logging
import subprocess

import pytest

from ..handlers import LanguageServerWebSocketHandler
from ..schema import SERVERS_RESPONSE
from ..session import LanguageServerSession


def assert_status_set(handler, expected_statuses, language_server=None):
Expand Down Expand Up @@ -100,3 +104,47 @@ async def test_ping(handlers):
assert ws_handler._ping_sent is True

ws_handler.on_close()


@pytest.mark.asyncio
async def test_broken_pipe(handlers, jsonrpc_init_msg, did_open_message, caplog):
"""If the pipe breaks (i.e. server dies), can we recover by restarting the server?"""
a_server = "pyls"

# use real handler in this test rather than a mock -> testing broken pipe requires that here
handler, ws_handler = handlers
manager = handler.manager

manager.initialize()

assert_status_set(handler, {"not_started"}, a_server)

ws_handler.open(a_server)

await ws_handler.on_message(jsonrpc_init_msg)
assert_status_set(handler, {"started"}, a_server)

session: LanguageServerSession = manager.sessions[a_server]
process: subprocess.Popen = session.process
process.kill()

with caplog.at_level(logging.WARNING):
# an attempt to write should raise BrokenPipeError
await ws_handler.on_message(did_open_message)
await asyncio.sleep(1)

# which should be caught
assert "Encountered pyls language server failure" in caplog.text
assert "exception: [Errno 32] Broken pipe" in caplog.text

# and the server should get restarted
assert "restarting session..." in caplog.text

assert_status_set(handler, {"started"}, a_server)

with caplog.at_level(logging.WARNING):
# we should be able to send a message now
await ws_handler.on_message(did_open_message)
assert caplog.text == ""

ws_handler.on_close()

0 comments on commit 600c0cb

Please sign in to comment.