Skip to content

Commit

Permalink
Avoid race condition creating duplicate sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
takluyver committed Nov 3, 2018
1 parent d63cd96 commit 237a212
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
80 changes: 48 additions & 32 deletions notebook/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ def __init__(self, kernel_finder, **kwargs):
super(MappingKernelManager, self).__init__(**kwargs)
self.last_kernel_activity = utcnow()
self._kernels = {}
self._kernels_starting = {}
self._restarters = {}
self.kernel_finder = kernel_finder
self.initialize_culler()

def get_kernel(self, kernel_id):
return self._kernels[kernel_id]
Expand Down Expand Up @@ -281,45 +283,59 @@ def start_kernel(self, kernel_id=None, path=None, kernel_name=None, **kwargs):
an existing kernel is returned, but it may be checked in the future.
"""
if kernel_id is None:
if path is not None:
kwargs['cwd'] = self.cwd_for_path(path)
kernel_id = str(uuid.uuid4())
if kernel_name is None:
kernel_name = 'pyimport/kernel'
elif '/' not in kernel_name:
kernel_name = 'spec/' + kernel_name

kernel = KernelInterface(kernel_name, self.kernel_finder)
yield gen.with_timeout(timedelta(seconds=self.kernel_info_timeout),
kernel.connect_client()
)
self._kernels[kernel_id] = kernel

self.start_watching_activity(kernel_id)
self.log.info("Kernel started: %s" % kernel_id)

kernel.restarter.add_callback(
lambda data: self._handle_kernel_died(kernel_id),
'failed'
)

# Increase the metric of number of kernels running
# for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(
type=self._kernels[kernel_id].kernel_type
).inc()

kernel_id = self.start_launching_kernel(path, kernel_name, **kwargs)
yield self.wait_for_start(kernel_id)
else:
self._check_kernel_id(kernel_id)
self.log.info("Using existing kernel: %s" % kernel_id)

# Initialize culling if not already
if not self._initialized_culler:
self.initialize_culler()

# py2-compat
raise gen.Return(kernel_id)

def start_launching_kernel(self, path=None, kernel_name=None, **kwargs):
"""Launch a new kernel, return its kernel ID
This is a synchronous method which starts the process of launching a
kernel. Call :meth:`wait_for_start` to get an awaitable for the
rest of the startup and connection process.
"""
if path is not None:
kwargs['cwd'] = self.cwd_for_path(path)
kernel_id = str(uuid.uuid4())
if kernel_name is None:
kernel_name = 'pyimport/kernel'
elif '/' not in kernel_name:
kernel_name = 'spec/' + kernel_name

kernel = KernelInterface(kernel_name, self.kernel_finder)
self._kernels[kernel_id] = kernel
self._kernels_starting[kernel_id] = \
self._finish_launching_kernel(kernel_id, kernel)
return kernel_id

def wait_for_start(self, kernel_id):
return self._kernels_starting[kernel_id]

@gen.coroutine
def _finish_launching_kernel(self, kernel_id, kernel):
yield gen.with_timeout(timedelta(seconds=self.kernel_info_timeout),
kernel.connect_client()
)

self.start_watching_activity(kernel_id)
self.log.info("Kernel started: %s" % kernel_id)

kernel.restarter.add_callback(
lambda data: self._handle_kernel_died(kernel_id),
'failed'
)

# Increase the metric of number of kernels running
# for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(
type=self._kernels[kernel_id].kernel_type
).inc()

def start_buffering(self, kernel_id, session_key, channels):
"""Start buffering messages for a kernel
Expand Down
20 changes: 11 additions & 9 deletions notebook/services/sessions/sessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,28 @@ def new_session_id(self):
def create_session(self, path=None, name=None, type=None, kernel_name=None, kernel_id=None):
"""Creates a session and returns its model"""
session_id = self.new_session_id()
if kernel_id is not None and kernel_id in self.kernel_manager:
pass
else:
kernel_id = yield self.start_kernel_for_session(session_id, path, name, type, kernel_name)
if (kernel_id is None) or (kernel_id not in self.kernel_manager):
kernel_id = self.start_kernel_for_session(session_id, path, name, type, kernel_name)

# Save the session before waiting for the kernel to be fully started,
# to avoid race conditions resulting in two sessions.
result = yield gen.maybe_future(
self.save_session(session_id, path=path, name=name, type=type, kernel_id=kernel_id)
)

# Now wait for the kernel to finish starting.
yield self.kernel_manager.wait_for_start(kernel_id)

# py2-compat
raise gen.Return(result)

@gen.coroutine
def start_kernel_for_session(self, session_id, path, name, type, kernel_name):
"""Start a new kernel for a given session."""
# allow contents manager to specify kernels cwd
kernel_path = self.contents_manager.get_kernel_path(path=path)
kernel_id = yield gen.maybe_future(
self.kernel_manager.start_kernel(path=kernel_path, kernel_name=kernel_name)
return self.kernel_manager.start_launching_kernel(
path=kernel_path, kernel_name=kernel_name,
)
# py2-compat
raise gen.Return(kernel_id)

def save_session(self, session_id, path=None, name=None, type=None, kernel_id=None):
"""Saves the items for the session with the given session_id
Expand Down

0 comments on commit 237a212

Please sign in to comment.