Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nudge both the shell and control channels #636

Merged
merged 1 commit into from
Dec 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,27 @@ def create_stream(self):
def nudge(self):
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
a control reply and at least one iopub message,
a shell or control reply and at least one iopub message,
ensuring that zmq subscriptions are established,
sockets are fully connected, and kernel is responsive.
Keeps retrying kernel_info_request until these are both received.
"""
kernel = self.kernel_manager.get_kernel(self.kernel_id)

# Do not nudge busy kernels as kernel info requests sent to shell are
# queued behind execution requests.
# nudging in this case would cause a potentially very long wait
# before connections are opened,
# plus it is *very* unlikely that a busy kernel will not finish
# establishing its zmq subscriptions before processing the next request.
if getattr(kernel, "execution_state") == "busy":
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
f = Future()
f.set_result(None)
return f
# Use a transient shell channel to prevent leaking
# shell responses to the front-end.
shell_channel = kernel.connect_shell()
# Use a transient control channel to prevent leaking
# control responses to the front-end.
control_channel = kernel.connect_control()
Expand All @@ -160,18 +174,26 @@ def cleanup(_=None):
"""Common cleanup"""
loop.remove_timeout(nudge_handle)
iopub_channel.stop_on_recv()
if not shell_channel.closed():
shell_channel.close()
if not control_channel.closed():
control_channel.close()

# trigger cleanup when both message futures are resolved
both_done.add_done_callback(cleanup)

def on_control_reply(msg):
def on_shell_reply(msg):
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
if not info_future.done():
self.log.debug("Nudge: resolving shell future: %s", self.kernel_id)
info_future.set_result(None)

def on_control_reply(msg):
self.log.debug("Nudge: control info reply received: %s", self.kernel_id)
if not info_future.done():
self.log.debug("Nudge: resolving control future: %s", self.kernel_id)
info_future.set_result(None)

def on_iopub(msg):
self.log.debug("Nudge: IOPub received: %s", self.kernel_id)
if not iopub_future.done():
Expand All @@ -180,6 +202,7 @@ def on_iopub(msg):
iopub_future.set_result(None)

iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
control_channel.on_recv(on_control_reply)
loop = IOLoop.current()

Expand All @@ -200,6 +223,12 @@ def nudge(count):
finish()
return

# check for closed zmq socket
if shell_channel.closed():
self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id)
finish()
return

# check for closed zmq socket
if control_channel.closed():
self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id)
Expand All @@ -209,6 +238,7 @@ def nudge(count):
if not both_done.done():
log = self.log.warning if count % 10 == 0 else self.log.debug
log("Nudge: attempt %s on kernel %s" % (count, self.kernel_id))
self.session.send(shell_channel, "kernel_info_request")
self.session.send(control_channel, "kernel_info_request")
nonlocal nudge_handle
nudge_handle = loop.call_later(0.5, nudge, count)
Expand Down