Skip to content
This repository has been archived by the owner on Dec 8, 2022. It is now read-only.

Attempt to re-establish websocket connection to KG #42

Merged
merged 1 commit into from
Jul 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions nb2kg/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from jupyter_client.session import Session
from traitlets.config.configurable import LoggingConfigurable

# TODO: Find a better way to specify global configuration options
# TODO: Find a better way to specify global configuration options
# for a server extension.
KG_URL = os.getenv('KG_URL', 'http://127.0.0.1:8888/')
KG_HEADERS = json.loads(os.getenv('KG_HEADERS', '{}'))
Expand Down Expand Up @@ -153,14 +153,16 @@ def __init__(self, **kwargs):
self.kernel_id = None
self.ws = None
self.ws_future = Future()
self.ws_future_cancelled = False
self.disconnected = False

@gen.coroutine
def _connect(self, kernel_id):
# NOTE(esevan): websocket is initialized before connection.
self.ws = None
self.kernel_id = kernel_id
ws_url = url_path_join(
os.getenv('KG_WS_URL', KG_URL.replace('http', 'ws')),
'/api/kernels',
'/api/kernels',
url_escape(kernel_id),
'channels'
)
Expand All @@ -185,39 +187,47 @@ def _connect(self, kernel_id):
self.ws_future.add_done_callback(self._connection_done)

def _connection_done(self, fut):
if not self.ws_future_cancelled: # prevent concurrent.futures._base.CancelledError
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
self.ws = fut.result()
self.log.debug("Connection is ready: ws: {}".format(self.ws))
else:
self.log.warning("Websocket connection has been cancelled via client disconnect before its establishment. "
self.log.warning("Websocket connection has been closed via client disconnect or due to error. "
"Kernel with ID '{}' may not be terminated on Gateway: {}".format(self.kernel_id, KG_URL))

def _disconnect(self):
self.disconnected = True
if self.ws is not None:
# Close connection
self.ws.close()
elif not self.ws_future.done():
# Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally
self.ws_future.cancel()
self.ws_future_cancelled = True
self.log.debug("_disconnect: ws_future_cancelled: {}".format(self.ws_future_cancelled))
self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected))

@gen.coroutine
def _read_messages(self, callback):
"""Read messages from gateway server."""
while True:
while self.ws is not None:
message = None
if not self.ws_future_cancelled:
if not self.disconnected:
try:
message = yield self.ws.read_message()
except Exception as e:
self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True)
if message is None:
if not self.disconnected:
self.log.warning("Lost connection to Gateway: {}".format(self.kernel_id))
break
callback(message) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open)
else: # ws cancelled - stop reading
break

if not self.disconnected: # NOTE(esevan): if websocket is not disconnected by client, try to reconnect.
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id))
self._connect(self.kernel_id)
loop = IOLoop.current()
loop.add_future(self.ws_future, lambda future: self._read_messages(callback))

def on_open(self, kernel_id, message_callback, **kwargs):
"""Web socket connection open against gateway server."""
self._connect(kernel_id)
Expand All @@ -241,7 +251,7 @@ def on_message(self, message):
def _write_message(self, message):
"""Send message to gateway server."""
try:
if not self.ws_future_cancelled:
if not self.disconnected and self.ws is not None:
self.ws.write_message(message)
except Exception as e:
self.log.error("Exception writing message to websocket: {}".format(e)) # , exc_info=True)
Expand Down