Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure iopub subscriptions propagate prior to accepting websocket connections #5908

Merged
merged 7 commits into from
Dec 18, 2020
87 changes: 79 additions & 8 deletions notebook/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,65 @@ def create_stream(self):
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
stream.channel = channel

def nudge(self):
shell_channel = self.channels['shell']
iopub_channel = self.channels['iopub']

future = Future()
info_future = Future()
iopub_future = Future()

def finish():
"""Common cleanup"""
loop.remove_timeout(timeout)
loop.remove_timeout(nudge_handle)
iopub_channel.stop_on_recv()
shell_channel.stop_on_recv()

def on_shell_reply(msg):
if not info_future.done():
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
shell_channel.stop_on_recv()
self.log.debug("Nudge: resolving shell future")
info_future.set_result(msg)
if iopub_future.done():
finish()
self.log.debug("Nudge: resolving main future in shell handler")
future.set_result(info_future.result())

def on_iopub(msg):
if not iopub_future.done():
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
iopub_channel.stop_on_recv()
self.log.debug("Nudge: resolving iopub future")
iopub_future.set_result(None)
if info_future.done():
finish()
self.log.debug("Nudge: resolving main future in iopub handler")
future.set_result(info_future.result())

def on_timeout():
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
finish()
if not future.done():
future.set_exception(TimeoutError("Timeout waiting for nudge"))

iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
loop = IOLoop.current()

# Nudge the kernel with kernel info requests until we get an IOPub message
def nudge():
self.log.debug("Nudge")
SylvainCorlay marked this conversation as resolved.
Show resolved Hide resolved
if not future.done():
self.log.debug("nudging")
self.session.send(shell_channel, "kernel_info_request")
nudge_handle = loop.call_later(0.5, nudge)
nudge_handle = loop.call_later(0, nudge)

timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
return future

def request_kernel_info(self):
"""send a request for kernel_info"""
km = self.kernel_manager
Expand Down Expand Up @@ -253,6 +312,7 @@ def _register_session(self):
yield stale_handler.close()
self._open_sessions[self.session_key] = self

@gen.coroutine
SylvainCorlay marked this conversation as resolved.
Show resolved Hide resolved
def open(self, kernel_id):
super().open()
km = self.kernel_manager
Expand All @@ -263,15 +323,21 @@ def open(self, kernel_id):
if buffer_info and buffer_info['session_key'] == self.session_key:
self.log.info("Restoring connection for %s", self.session_key)
self.channels = buffer_info['channels']
replay_buffer = buffer_info['buffer']
if replay_buffer:
self.log.info("Replaying %s buffered messages", len(replay_buffer))
for channel, msg_list in replay_buffer:
stream = self.channels[channel]
self._on_zmq_reply(stream, msg_list)
connected = self.nudge()
minrk marked this conversation as resolved.
Show resolved Hide resolved

def replay(value):
replay_buffer = buffer_info['buffer']
if replay_buffer:
self.log.info("Replaying %s buffered messages", len(replay_buffer))
for channel, msg_list in replay_buffer:
stream = self.channels[channel]
self._on_zmq_reply(stream, msg_list)

connected.add_done_callback(replay)
else:
try:
self.create_stream()
connected = self.nudge()
except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
# WebSockets don't response to traditional error codes so we
Expand All @@ -285,8 +351,13 @@ def open(self, kernel_id):
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')

for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)
def subscribe(value):
for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)

connected.add_done_callback(subscribe)

return connected

def on_message(self, msg):
if not self.channels:
Expand Down