From f8fdb764b95b1ff00a1edb8c1dbfabd12ade25de Mon Sep 17 00:00:00 2001 From: Eunsoo Park Date: Thu, 18 Jul 2019 19:09:47 +0900 Subject: [PATCH] Attempt to re-establish websocket connection to KG When nb2kg lost the connection to KG, nb2kg didn't connect to KG again although the websocket connection from the client was still alive. This change recovers the connection to KG to prevent above anomaly. Signed-off-by: Eunsoo Park --- nb2kg/handlers.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/nb2kg/handlers.py b/nb2kg/handlers.py index 0ad0baf..8283e5d 100644 --- a/nb2kg/handlers.py +++ b/nb2kg/handlers.py @@ -20,7 +20,7 @@ from jupyter_client.session import Session from traitlets.config.configurable import LoggingConfigurable -# TODO: Find a better way to specify global configuration options +# TODO: Find a better way to specify global configuration options # for a server extension. KG_URL = os.getenv('KG_URL', 'http://127.0.0.1:8888/') KG_HEADERS = json.loads(os.getenv('KG_HEADERS', '{}')) @@ -153,14 +153,16 @@ def __init__(self, **kwargs): self.kernel_id = None self.ws = None self.ws_future = Future() - self.ws_future_cancelled = False + self.disconnected = False @gen.coroutine def _connect(self, kernel_id): + # NOTE(esevan): websocket is initialized before connection. + self.ws = None self.kernel_id = kernel_id ws_url = url_path_join( os.getenv('KG_WS_URL', KG_URL.replace('http', 'ws')), - '/api/kernels', + '/api/kernels', url_escape(kernel_id), 'channels' ) @@ -185,39 +187,47 @@ def _connect(self, kernel_id): self.ws_future.add_done_callback(self._connection_done) def _connection_done(self, fut): - if not self.ws_future_cancelled: # prevent concurrent.futures._base.CancelledError + if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError self.ws = fut.result() self.log.debug("Connection is ready: ws: {}".format(self.ws)) else: - self.log.warning("Websocket connection has been cancelled via client disconnect before its establishment. " + self.log.warning("Websocket connection has been closed via client disconnect or due to error. " "Kernel with ID '{}' may not be terminated on Gateway: {}".format(self.kernel_id, KG_URL)) def _disconnect(self): + self.disconnected = True if self.ws is not None: # Close connection self.ws.close() elif not self.ws_future.done(): # Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally self.ws_future.cancel() - self.ws_future_cancelled = True - self.log.debug("_disconnect: ws_future_cancelled: {}".format(self.ws_future_cancelled)) + self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected)) @gen.coroutine def _read_messages(self, callback): """Read messages from gateway server.""" - while True: + while self.ws is not None: message = None - if not self.ws_future_cancelled: + if not self.disconnected: try: message = yield self.ws.read_message() except Exception as e: self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True) if message is None: + if not self.disconnected: + self.log.warning("Lost connection to Gateway: {}".format(self.kernel_id)) break callback(message) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open) else: # ws cancelled - stop reading break + if not self.disconnected: # NOTE(esevan): if websocket is not disconnected by client, try to reconnect. + self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id)) + self._connect(self.kernel_id) + loop = IOLoop.current() + loop.add_future(self.ws_future, lambda future: self._read_messages(callback)) + def on_open(self, kernel_id, message_callback, **kwargs): """Web socket connection open against gateway server.""" self._connect(kernel_id) @@ -241,7 +251,7 @@ def on_message(self, message): def _write_message(self, message): """Send message to gateway server.""" try: - if not self.ws_future_cancelled: + if not self.disconnected and self.ws is not None: self.ws.write_message(message) except Exception as e: self.log.error("Exception writing message to websocket: {}".format(e)) # , exc_info=True)