Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pytest_plugin with fixtures to test auth in core and extensions #956

Merged
merged 1 commit into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/simple/simple_ext1/handlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from jupyter_server.auth import authorized
from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.extension.handler import (
ExtensionHandlerJinjaMixin,
Expand All @@ -7,6 +8,9 @@


class DefaultHandler(ExtensionHandlerMixin, JupyterHandler):
auth_resource = "simple_ext1:default"

@authorized
def get(self):
# The name of the extension to which this handler is linked.
self.log.info(f"Extension Name in {self.name} Default Handler: {self.name}")
Expand Down
24 changes: 21 additions & 3 deletions examples/simple/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,31 @@


@pytest.fixture
def jp_server_config(jp_template_dir):
def jp_server_auth_resources(jp_server_auth_core_resources):
for url_regex in [
"/simple_ext1/default",
]:
jp_server_auth_core_resources[url_regex] = "simple_ext1:default"
return jp_server_auth_core_resources


@pytest.fixture
def jp_server_config(jp_template_dir, jp_server_authorizer):
return {
"ServerApp": {"jpserver_extensions": {"simple_ext1": True}},
"ServerApp": {
"jpserver_extensions": {"simple_ext1": True},
"authorizer_class": jp_server_authorizer,
},
}


async def test_handler_default(jp_fetch):
async def test_handler_default(jp_fetch, jp_serverapp):
jp_serverapp.authorizer.permissions = {
"actions": ["read"],
"resources": [
"simple_ext1:default",
],
}
r = await jp_fetch("simple_ext1/default", method="GET")
assert r.code == 200
assert r.body.decode().index("Hello Simple 1 - I am the default...") > -1
Expand Down
127 changes: 125 additions & 2 deletions jupyter_server/pytest_plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import importlib
import io
import json
import logging
Expand All @@ -14,10 +15,13 @@
import pytest
import tornado
from tornado.escape import url_escape
from traitlets.config import Config
from tornado.httpclient import HTTPClientError
from tornado.websocket import WebSocketHandler
from traitlets.config import Config, re

from jupyter_server.auth import Authorizer
from jupyter_server.extension import serverextension
from jupyter_server.serverapp import ServerApp
from jupyter_server.serverapp import JUPYTER_SERVICE_HANDLERS, ServerApp
from jupyter_server.services.contents.filemanager import FileContentsManager
from jupyter_server.services.contents.largefilemanager import LargeFileManager
from jupyter_server.utils import url_path_join
Expand Down Expand Up @@ -494,3 +498,122 @@ async def _():
pass

return _


@pytest.fixture
def send_request(jp_fetch, jp_ws_fetch):
"""Send to Jupyter Server and return response code."""

async def _(url, **fetch_kwargs):
if url.endswith("channels") or "/websocket/" in url:
fetch = jp_ws_fetch
else:
fetch = jp_fetch

try:
r = await fetch(url, **fetch_kwargs, allow_nonstandard_methods=True)
code = r.code
except HTTPClientError as err:
code = err.code
else:
if fetch is jp_ws_fetch:
r.close()

return code

return _


@pytest.fixture
def jp_server_auth_core_resources():
modules = []
for mod_name in JUPYTER_SERVICE_HANDLERS.values():
if mod_name:
modules.extend(mod_name)
resource_map = {}
for handler_module in modules:
mod = importlib.import_module(handler_module)
name = mod.AUTH_RESOURCE
for handler in mod.default_handlers:
url_regex = handler[0]
resource_map[url_regex] = name
return resource_map


@pytest.fixture
def jp_server_auth_resources(jp_server_auth_core_resources):
return jp_server_auth_core_resources


@pytest.fixture
def jp_server_authorizer(jp_server_auth_resources):
class _(Authorizer):

# Set these class attributes from within a test
# to verify that they match the arguments passed
# by the REST API.
permissions: dict = {}

HTTP_METHOD_TO_AUTH_ACTION = {
"GET": "read",
"HEAD": "read",
"OPTIONS": "read",
"POST": "write",
"PUT": "write",
"PATCH": "write",
"DELETE": "write",
"WEBSOCKET": "execute",
}

def match_url_to_resource(self, url, regex_mapping=None):
"""Finds the JupyterHandler regex pattern that would
match the given URL and returns the resource name (str)
of that handler.

e.g.
/api/contents/... returns "contents"
"""
if not regex_mapping:
regex_mapping = jp_server_auth_resources
for regex, auth_resource in regex_mapping.items():
pattern = re.compile(regex)
if pattern.fullmatch(url):
return auth_resource

def normalize_url(self, path):
"""Drop the base URL and make sure path leads with a /"""
base_url = self.parent.base_url
# Remove base_url
if path.startswith(base_url):
path = path[len(base_url) :]
# Make sure path starts with /
if not path.startswith("/"):
path = "/" + path
return path

def is_authorized(self, handler, user, action, resource):
# Parse Request
if isinstance(handler, WebSocketHandler):
method = "WEBSOCKET"
else:
method = handler.request.method
url = self.normalize_url(handler.request.path)

# Map request parts to expected action and resource.
expected_action = self.HTTP_METHOD_TO_AUTH_ACTION[method]
expected_resource = self.match_url_to_resource(url)

# Assert that authorization layer returns the
# correct action + resource.
assert action == expected_action
assert resource == expected_resource

# Now, actually apply the authorization layer.
return all(
[
action in self.permissions.get("actions", []),
resource in self.permissions.get("resources", []),
]
)

return _
87 changes: 15 additions & 72 deletions tests/auth/test_authorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,29 @@
from jupyter_client.kernelspec import NATIVE_KERNEL_NAME
from nbformat import writes
from nbformat.v4 import new_notebook
from tornado.httpclient import HTTPClientError
from tornado.websocket import WebSocketHandler

from jupyter_server.auth.authorizer import Authorizer
from jupyter_server.auth.utils import HTTP_METHOD_TO_AUTH_ACTION, match_url_to_resource
from jupyter_server.services.security import csp_report_uri


class AuthorizerforTesting(Authorizer):

# Set these class attributes from within a test
# to verify that they match the arguments passed
# by the REST API.
permissions: dict = {}

def normalize_url(self, path):
"""Drop the base URL and make sure path leads with a /"""
base_url = self.parent.base_url
# Remove base_url
if path.startswith(base_url):
path = path[len(base_url) :]
# Make sure path starts with /
if not path.startswith("/"):
path = "/" + path
return path

def is_authorized(self, handler, user, action, resource):
# Parse Request
if isinstance(handler, WebSocketHandler):
method = "WEBSOCKET"
else:
method = handler.request.method
url = self.normalize_url(handler.request.path)

# Map request parts to expected action and resource.
expected_action = HTTP_METHOD_TO_AUTH_ACTION[method]
expected_resource = match_url_to_resource(url)

# Assert that authorization layer returns the
# correct action + resource.
assert action == expected_action
assert resource == expected_resource

# Now, actually apply the authorization layer.
return all(
[
action in self.permissions.get("actions", []),
resource in self.permissions.get("resources", []),
]
)


@pytest.fixture
def jp_server_config():
return {"ServerApp": {"authorizer_class": AuthorizerforTesting}}
def jp_server_config(jp_server_authorizer):
return {
"ServerApp": {"authorizer_class": jp_server_authorizer},
"jpserver_extensions": {"jupyter_server_terminals": True},
}


@pytest.fixture
def send_request(jp_fetch, jp_ws_fetch):
"""Send to Jupyter Server and return response code."""

async def _(url, **fetch_kwargs):
if url.endswith("channels") or "/websocket/" in url:
fetch = jp_ws_fetch
else:
fetch = jp_fetch

try:
r = await fetch(url, **fetch_kwargs, allow_nonstandard_methods=True)
code = r.code
except HTTPClientError as err:
code = err.code
else:
if fetch is jp_ws_fetch:
r.close()

print(code, url, fetch_kwargs)
return code

return _
def jp_server_auth_resources(jp_server_auth_core_resources):
# terminal plugin doesn't have importable url patterns
# get these from terminal/__init__.py
for url_regex in [
r"/terminals/websocket/(\w+)",
"/api/terminals",
r"/api/terminals/(\w+)",
]:
jp_server_auth_core_resources[url_regex] = "terminals"
return jp_server_auth_core_resources


HTTP_REQUESTS = [
Expand Down