Skip to content

Commit

Permalink
Attempt to fix issue jupyter-server#1051: FD leak due to race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul26goyal committed Apr 29, 2022
1 parent 2cd32d0 commit 2b4607d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
54 changes: 52 additions & 2 deletions enterprise_gateway/services/kernels/remotemanager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
"""Kernel managers that operate against a remote process."""
import time

import asyncio

import os
import re
Expand All @@ -18,6 +21,7 @@
from ..processproxies.processproxy import LocalProcessProxy, RemoteProcessProxy
from ..sessions.kernelsessionmanager import KernelSessionManager

default_kernel_launch_timeout = float(os.getenv('EG_KERNEL_LAUNCH_TIMEOUT', '30'))

def import_item(name):
"""Import and return ``bar`` given the string ``foo.bar``.
Expand Down Expand Up @@ -189,6 +193,50 @@ async def start_kernel(self, *args, **kwargs):
self.parent.kernel_session_manager.create_session(kernel_id, **kwargs)
return kernel_id

async def restart_kernel(self, kernel_id):
kernel = self.get_kernel(kernel_id)
self.log.debug(f"Current value of the Kernel Restarting: {kernel.restarting}")
if kernel.restarting: # assuming duplicate request.
await self.wait_and_poll_for_restart_to_complete(kernel_id, "restart")
self.log.info(
f"Done with waiting for restart to complete. Current value of restarting: {kernel.restarting}. "
f"Skipping restarting kernel.")
return
self.log.info("Going ahead to process restart kernel request.")
try:
kernel.restarting = True # Moved in out of RemoteKernelManager
await super(RemoteMappingKernelManager, self).restart_kernel(kernel_id)
finally:
self.log.debug("Resetting `restarting` flag to False.")
kernel.restarting = False

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
kernel = self.get_kernel(kernel_id)
self.log.debug(f"Current value of the Kernel Restarting: {kernel.restarting}")
if kernel.restarting:
await self.wait_and_poll_for_restart_to_complete(kernel_id, "shutdown")
try:
await super(RemoteMappingKernelManager, self).shutdown_kernel(kernel_id, now, restart)
except KeyError as ke: # this is hit for multiple shutdown request.
self.log.exception(f"Exception while shutting Kernel: {kernel_id}: {ke}")

async def wait_and_poll_for_restart_to_complete(self, kernel_id, action="shutdown"):
kernel = self.get_kernel(kernel_id)
start_time = int(time.time()) # epoc time in seconds
timeout = kernel.kernel_launch_timeout
poll_time = 1 # TODO can be read from a new config.
self.log.info(f"Kernel is restarting when {action} request received. Polling every {poll_time} "
f"seconds for next {timeout} seconds for kernel '{kernel_id}'"
f" to complete its restart.")
while kernel.restarting:
now = int(time.time())
if (now - start_time) > timeout:
self.log.info(f"Wait_Timeout: Existing out of the restart wait loop to {action} kernel.")
break
await asyncio.sleep(poll_time)
self.log.info(f"Returning with current value of the Kernel Restarting: {kernel.restarting}.")
return

def _enforce_kernel_limits(self, username: str) -> None:
"""
If MaxKernels or MaxKernelsPerUser are configured, enforce the respective values.
Expand Down Expand Up @@ -341,6 +389,7 @@ def __init__(self, **kwargs):
self.sigint_value = None
self.kernel_id = None
self.user_overrides = {}
self.kernel_launch_timeout = default_kernel_launch_timeout
self.restarting = False # need to track whether we're in a restart situation or not

# If this instance supports port caching, then disable cache_ports since we don't need this
Expand Down Expand Up @@ -412,6 +461,9 @@ def _capture_user_overrides(self, **kwargs):
of the kernelspec env stanza that would have otherwise overridden the user-provided values.
"""
env = kwargs.get("env", {})
# see if KERNEL_LAUNCH_TIMEOUT was included from user. If so, override default
if env.get('KERNEL_LAUNCH_TIMEOUT', None):
self.kernel_launch_timeout = float(env.get('KERNEL_LAUNCH_TIMEOUT'))
self.user_overrides.update(
{
key: value
Expand Down Expand Up @@ -504,7 +556,6 @@ async def restart_kernel(self, now=False, **kwargs):
Any options specified here will overwrite those used to launch the
kernel.
"""
self.restarting = True
kernel_id = self.kernel_id or os.path.basename(self.connection_file).replace(
"kernel-", ""
).replace(".json", "")
Expand Down Expand Up @@ -535,7 +586,6 @@ async def restart_kernel(self, now=False, **kwargs):
# Refresh persisted state.
if self.kernel_session_manager:
self.kernel_session_manager.refresh_session(kernel_id)
self.restarting = False

async def signal_kernel(self, signum):
"""
Expand Down
6 changes: 4 additions & 2 deletions enterprise_gateway/services/processproxies/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ def poll(self):
container_status = self.get_container_status(None)
# Do not check whether container_status is None
# EG couldn't restart kernels although connections exists.
# See https://github.com/jupyter-server/enterprise_gateway/issues/827
if container_status in self.get_initial_states():
# See https://github.com/jupyter/enterprise_gateway/issues/827
# The new check for `kernel_manager.restarting` added to prevent that false positive
# response that might happen when the kernel is restarting.
if self.kernel_manager.restarting or container_status in self.get_initial_states():
result = None

return result
Expand Down

0 comments on commit 2b4607d

Please sign in to comment.