Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headless/Scheduled Notebooks #957

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions jupyter_server/services/scheduling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Scheduling API for JupyterLab"""
from .extension import SchedulerApp

__version__ = "0.1.0"


def _jupyter_server_extension_points():
return [{"module": "jupyter_scheduling", "app": SchedulerApp}]
15 changes: 15 additions & 0 deletions jupyter_server/services/scheduling/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from dataclasses import dataclass


@dataclass(frozen=True)
class ExecutionConfig:
"""
Config values passed to the
execution manager and scheduler
"""

db_url: str
root_dir: str
execution_manager_class: any
environments_manager_class: any
scheduler_class: any
75 changes: 75 additions & 0 deletions jupyter_server/services/scheduling/environments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import json
import os
import subprocess
from abc import ABC, abstractmethod
from typing import List

from jupyter_scheduling.models import OutputFormat, RuntimeEnvironment


class EnvironmentManager(ABC):
@abstractmethod
def list_environments() -> List[RuntimeEnvironment]:
pass

@abstractmethod
def manage_environments_command(self) -> str:
pass


class CondaEnvironmentManager(EnvironmentManager):
"""Provides list of system installed conda environments"""

def list_environments(self) -> List[RuntimeEnvironment]:
environments = []

try:
envs = subprocess.check_output(["conda", "env", "list", "--json"])
envs = json.loads(envs).get("envs", [])
except subprocess.CalledProcessError as e:
raise EnvironmentRetrievalError(e) from e

for env in envs:
name = os.path.basename(env)
environments.append(
RuntimeEnvironment(
name=name,
label=name,
description=f"Conda environment: {name}",
file_extensions=["ipynb", "py"],
output_formats=[
OutputFormat(name="ipynb", label="Notebook"),
OutputFormat(name="html", label="HTML"),
],
metadata={"path": env},
)
)

return environments

def manage_environments_command(self) -> str:
return ""


class StaticEnvironmentManager(EnvironmentManager):
"""Provides a static list of environments, for demo purpose only"""

def list_environments(self) -> List[RuntimeEnvironment]:
name = "jupyterlab-env"
path = os.path.join(os.environ["HOME"], name)
return [
RuntimeEnvironment(
name=name,
label=name,
description=f"Virtual environment: {name}",
file_extensions=["ipynb", "py"],
metadata={"path": path},
)
]

def manage_environments_command(self) -> str:
return ""


class EnvironmentRetrievalError(Exception):
pass
156 changes: 156 additions & 0 deletions jupyter_server/services/scheduling/executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import os
import traceback
from abc import ABC, abstractclassmethod, abstractmethod
from datetime import datetime
from typing import Dict, List

import nbconvert
import nbformat
from jupyter_scheduling.config import ExecutionConfig
from jupyter_scheduling.models import DescribeJob, JobFeature, OutputFormat, Status
from jupyter_scheduling.orm import Job, create_session
from jupyter_scheduling.parameterize import add_parameters
from jupyter_scheduling.utils import get_utc_timestamp, resolve_path
from nbconvert.preprocessors import ExecutePreprocessor


class ExecutionManager(ABC):
"""Base execution manager.
Clients are expected to override this class
to provide concrete implementations of the
execution manager. At the minimum, subclasses
should provide concrete implementation of the
execute method.
"""

_model = None
_db_session = None

def __init__(self, job_id: str, config: ExecutionConfig = {}):
self.job_id = job_id
self.root_dir = config.root_dir
self.config = config

@property
def model(self):
if self._model is None:
with self.db_session() as session:
job = session.query(Job).filter(Job.job_id == self.job_id).first()
self._model = DescribeJob.from_orm(job)
return self._model

@property
def db_session(self):
if self._db_session is None:
self._db_session = create_session(self.config.db_url)

return self._db_session

# Don't override this method
def process(self):
self.before_start()
try:
self.execute()
except Exception as e:
self.on_failure(e)
else:
self.on_complete()

@abstractmethod
def execute(self):
pass

@classmethod
@abstractmethod
def supported_features(cls) -> Dict[JobFeature, bool]:
"""Returns a configuration of supported features
by the execution engine. Implementors are expected
to override this to return a dictionary of supported
job creation features.
"""
pass

def before_start(self):
"""Called before start of execute"""
job = self.model
with self.db_session() as session:
session.query(Job).filter(Job.job_id == job.job_id).update(
{"start_time": get_utc_timestamp(), "status": Status.IN_PROGRESS}
)
session.commit()

def on_failure(self, e: Exception):
"""Called after failure of execute"""
job = self.model
with self.db_session() as session:
session.query(Job).filter(Job.job_id == job.job_id).update(
{"status": Status.FAILED, "status_message": str(e)}
)
session.commit()

traceback.print_exc()

def on_complete(self):
"""Called after job is completed"""
job = self.model
with self.db_session() as session:
session.query(Job).filter(Job.job_id == job.job_id).update(
{"status": Status.COMPLETED, "end_time": get_utc_timestamp()}
)
session.commit()


class DefaultExecutionManager(ExecutionManager):
"""Default execution manager that executes notebooks"""

def execute(self):
job = self.model

output_dir = os.path.dirname(resolve_path(job.output_uri, self.root_dir))
if not os.path.exists(output_dir):
os.makedirs(output_dir)

with open(resolve_path(job.input_uri, self.root_dir)) as f:
nb = nbformat.read(f, as_version=4)

if job.parameters:
nb = add_parameters(nb, job.parameters)

ep = ExecutePreprocessor(
timeout=job.timeout_seconds,
kernel_name=nb.metadata.kernelspec["name"],
store_widget_state=True,
)

ep.preprocess(
nb, {"metadata": {"path": os.path.dirname(resolve_path(job.output_uri, self.root_dir))}}
)

if job.output_formats:
filepath = resolve_path(job.output_uri, self.root_dir)
base_filepath = os.path.splitext(filepath)[-2]
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
output, resources = cls().from_notebook_node(nb)
with open(f"{base_filepath}.{output_format}", "w", encoding="utf-8") as f:
f.write(output)
else:
with open(resolve_path(job.output_uri, self.root_dir), "w", encoding="utf-8") as f:
nbformat.write(nb, f)

def supported_features(cls) -> Dict[JobFeature, bool]:
return {
JobFeature.job_name: True,
JobFeature.output_formats: True,
JobFeature.job_definition: False,
JobFeature.idempotency_token: False,
JobFeature.tags: False,
JobFeature.email_notifications: False,
JobFeature.timeout_seconds: False,
JobFeature.retry_on_timeout: False,
JobFeature.max_retries: False,
JobFeature.min_retry_interval_millis: False,
JobFeature.output_filename_template: False,
JobFeature.stop_job: True,
JobFeature.delete_job: True,
}
85 changes: 85 additions & 0 deletions jupyter_server/services/scheduling/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from jupyter_core.paths import jupyter_data_dir
from jupyter_scheduling.orm import create_tables
from jupyter_scheduling.scheduler import Scheduler
from traitlets import Bool, Unicode, default

from jupyter_server.extension.application import ExtensionApp
from jupyter_server.traittypes import TypeFromClasses
from jupyter_server.transutils import _i18n

from .config import ExecutionConfig
from .environments import CondaEnvironmentManager
from .executors import DefaultExecutionManager
from .handlers import (
BatchJobHandler,
ConfigHandler,
CreateJobWithDefinitionHandler,
FeaturesHandler,
JobDefinitionHandler,
JobHandler,
JobsCountHandler,
RuntimeEnvironmentsHandler,
)

JOB_DEFINITION_ID_REGEX = r"(?P<job_definition_id>\w+-\w+-\w+-\w+-\w+)"
JOB_ID_REGEX = r"(?P<job_id>\w+-\w+-\w+-\w+-\w+)"


class SchedulerApp(ExtensionApp):
name = "jupyter_scheduling"
handlers = [
(r"/job_definitions", JobDefinitionHandler),
(r"/job_definitions/%s" % JOB_DEFINITION_ID_REGEX, JobDefinitionHandler),
(r"/job_definitions/%s/jobs" % JOB_DEFINITION_ID_REGEX, CreateJobWithDefinitionHandler),
(r"/jobs", JobHandler),
(r"/jobs/count", JobsCountHandler),
(r"/jobs/%s" % JOB_ID_REGEX, JobHandler),
(r"/runtime_environments", RuntimeEnvironmentsHandler),
(r"/scheduler/features", FeaturesHandler),
(r"/scheduler/config", ConfigHandler),
(r"/batch/jobs", BatchJobHandler),
]

drop_tables = Bool(False, config=True, help="Drop the database tables before starting.")

db_url = Unicode(config=True, help="URI for the scheduler database")

@default("db_url")
def get_demo_db_url_default(self):
return f"sqlite:///{jupyter_data_dir()}/scheduler.sqlite"

environment_manager_class = TypeFromClasses(
default_value=CondaEnvironmentManager,
klasses=["jupyter_scheduling.environments.EnvironmentManager"],
config=True,
help=_i18n("The runtime environment manager class to use."),
)

execution_manager_class = TypeFromClasses(
default_value=DefaultExecutionManager,
klasses=["jupyter_scheduling.executors.ExecutionManager"],
config=True,
help=_i18n("The execution manager class to use."),
)

scheduler_class = TypeFromClasses(
default_value=Scheduler,
klasses=["jupyter_scheduling.scheduler.BaseScheduler"],
config=True,
help=_i18n("The scheduler class to use."),
)

def initialize_settings(self):
super().initialize_settings()

create_tables(self.db_url, self.drop_tables)

self.settings.update(
execution_config=ExecutionConfig(
db_url=self.db_url,
execution_manager_class=self.execution_manager_class,
environments_manager_class=self.environment_manager_class,
scheduler_class=self.scheduler_class,
root_dir=self.settings.get("server_root_dir", None),
)
)
Loading