Skip to content

Commit

Permalink
Update pytest_plugin with fixtures to test auth in core and extensions (
Browse files Browse the repository at this point in the history
#956)

Co-authored-by: Akshay Chitneni <[email protected]>
  • Loading branch information
akshaychitneni and Akshay Chitneni authored Aug 31, 2022
1 parent 6dc7d53 commit 644540b
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 77 deletions.
4 changes: 4 additions & 0 deletions examples/simple/simple_ext1/handlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from jupyter_server.auth import authorized
from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.extension.handler import (
ExtensionHandlerJinjaMixin,
Expand All @@ -7,6 +8,9 @@


class DefaultHandler(ExtensionHandlerMixin, JupyterHandler):
auth_resource = "simple_ext1:default"

@authorized
def get(self):
# The name of the extension to which this handler is linked.
self.log.info(f"Extension Name in {self.name} Default Handler: {self.name}")
Expand Down
24 changes: 21 additions & 3 deletions examples/simple/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,31 @@


@pytest.fixture
def jp_server_config(jp_template_dir):
def jp_server_auth_resources(jp_server_auth_core_resources):
for url_regex in [
"/simple_ext1/default",
]:
jp_server_auth_core_resources[url_regex] = "simple_ext1:default"
return jp_server_auth_core_resources


@pytest.fixture
def jp_server_config(jp_template_dir, jp_server_authorizer):
return {
"ServerApp": {"jpserver_extensions": {"simple_ext1": True}},
"ServerApp": {
"jpserver_extensions": {"simple_ext1": True},
"authorizer_class": jp_server_authorizer,
},
}


async def test_handler_default(jp_fetch):
async def test_handler_default(jp_fetch, jp_serverapp):
jp_serverapp.authorizer.permissions = {
"actions": ["read"],
"resources": [
"simple_ext1:default",
],
}
r = await jp_fetch("simple_ext1/default", method="GET")
assert r.code == 200
assert r.body.decode().index("Hello Simple 1 - I am the default...") > -1
Expand Down
127 changes: 125 additions & 2 deletions jupyter_server/pytest_plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import importlib
import io
import json
import logging
Expand All @@ -14,10 +15,13 @@
import pytest
import tornado
from tornado.escape import url_escape
from traitlets.config import Config
from tornado.httpclient import HTTPClientError
from tornado.websocket import WebSocketHandler
from traitlets.config import Config, re

from jupyter_server.auth import Authorizer
from jupyter_server.extension import serverextension
from jupyter_server.serverapp import ServerApp
from jupyter_server.serverapp import JUPYTER_SERVICE_HANDLERS, ServerApp
from jupyter_server.services.contents.filemanager import FileContentsManager
from jupyter_server.services.contents.largefilemanager import LargeFileManager
from jupyter_server.utils import url_path_join
Expand Down Expand Up @@ -494,3 +498,122 @@ async def _():
pass

return _


@pytest.fixture
def send_request(jp_fetch, jp_ws_fetch):
"""Send to Jupyter Server and return response code."""

async def _(url, **fetch_kwargs):
if url.endswith("channels") or "/websocket/" in url:
fetch = jp_ws_fetch
else:
fetch = jp_fetch

try:
r = await fetch(url, **fetch_kwargs, allow_nonstandard_methods=True)
code = r.code
except HTTPClientError as err:
code = err.code
else:
if fetch is jp_ws_fetch:
r.close()

return code

return _


@pytest.fixture
def jp_server_auth_core_resources():
modules = []
for mod_name in JUPYTER_SERVICE_HANDLERS.values():
if mod_name:
modules.extend(mod_name)
resource_map = {}
for handler_module in modules:
mod = importlib.import_module(handler_module)
name = mod.AUTH_RESOURCE
for handler in mod.default_handlers:
url_regex = handler[0]
resource_map[url_regex] = name
return resource_map


@pytest.fixture
def jp_server_auth_resources(jp_server_auth_core_resources):
return jp_server_auth_core_resources


@pytest.fixture
def jp_server_authorizer(jp_server_auth_resources):
class _(Authorizer):

# Set these class attributes from within a test
# to verify that they match the arguments passed
# by the REST API.
permissions: dict = {}

HTTP_METHOD_TO_AUTH_ACTION = {
"GET": "read",
"HEAD": "read",
"OPTIONS": "read",
"POST": "write",
"PUT": "write",
"PATCH": "write",
"DELETE": "write",
"WEBSOCKET": "execute",
}

def match_url_to_resource(self, url, regex_mapping=None):
"""Finds the JupyterHandler regex pattern that would
match the given URL and returns the resource name (str)
of that handler.
e.g.
/api/contents/... returns "contents"
"""
if not regex_mapping:
regex_mapping = jp_server_auth_resources
for regex, auth_resource in regex_mapping.items():
pattern = re.compile(regex)
if pattern.fullmatch(url):
return auth_resource

def normalize_url(self, path):
"""Drop the base URL and make sure path leads with a /"""
base_url = self.parent.base_url
# Remove base_url
if path.startswith(base_url):
path = path[len(base_url) :]
# Make sure path starts with /
if not path.startswith("/"):
path = "/" + path
return path

def is_authorized(self, handler, user, action, resource):
# Parse Request
if isinstance(handler, WebSocketHandler):
method = "WEBSOCKET"
else:
method = handler.request.method
url = self.normalize_url(handler.request.path)

# Map request parts to expected action and resource.
expected_action = self.HTTP_METHOD_TO_AUTH_ACTION[method]
expected_resource = self.match_url_to_resource(url)

# Assert that authorization layer returns the
# correct action + resource.
assert action == expected_action
assert resource == expected_resource

# Now, actually apply the authorization layer.
return all(
[
action in self.permissions.get("actions", []),
resource in self.permissions.get("resources", []),
]
)

return _
87 changes: 15 additions & 72 deletions tests/auth/test_authorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,29 @@
from jupyter_client.kernelspec import NATIVE_KERNEL_NAME
from nbformat import writes
from nbformat.v4 import new_notebook
from tornado.httpclient import HTTPClientError
from tornado.websocket import WebSocketHandler

from jupyter_server.auth.authorizer import Authorizer
from jupyter_server.auth.utils import HTTP_METHOD_TO_AUTH_ACTION, match_url_to_resource
from jupyter_server.services.security import csp_report_uri


class AuthorizerforTesting(Authorizer):

# Set these class attributes from within a test
# to verify that they match the arguments passed
# by the REST API.
permissions: dict = {}

def normalize_url(self, path):
"""Drop the base URL and make sure path leads with a /"""
base_url = self.parent.base_url
# Remove base_url
if path.startswith(base_url):
path = path[len(base_url) :]
# Make sure path starts with /
if not path.startswith("/"):
path = "/" + path
return path

def is_authorized(self, handler, user, action, resource):
# Parse Request
if isinstance(handler, WebSocketHandler):
method = "WEBSOCKET"
else:
method = handler.request.method
url = self.normalize_url(handler.request.path)

# Map request parts to expected action and resource.
expected_action = HTTP_METHOD_TO_AUTH_ACTION[method]
expected_resource = match_url_to_resource(url)

# Assert that authorization layer returns the
# correct action + resource.
assert action == expected_action
assert resource == expected_resource

# Now, actually apply the authorization layer.
return all(
[
action in self.permissions.get("actions", []),
resource in self.permissions.get("resources", []),
]
)


@pytest.fixture
def jp_server_config():
return {"ServerApp": {"authorizer_class": AuthorizerforTesting}}
def jp_server_config(jp_server_authorizer):
return {
"ServerApp": {"authorizer_class": jp_server_authorizer},
"jpserver_extensions": {"jupyter_server_terminals": True},
}


@pytest.fixture
def send_request(jp_fetch, jp_ws_fetch):
"""Send to Jupyter Server and return response code."""

async def _(url, **fetch_kwargs):
if url.endswith("channels") or "/websocket/" in url:
fetch = jp_ws_fetch
else:
fetch = jp_fetch

try:
r = await fetch(url, **fetch_kwargs, allow_nonstandard_methods=True)
code = r.code
except HTTPClientError as err:
code = err.code
else:
if fetch is jp_ws_fetch:
r.close()

print(code, url, fetch_kwargs)
return code

return _
def jp_server_auth_resources(jp_server_auth_core_resources):
# terminal plugin doesn't have importable url patterns
# get these from terminal/__init__.py
for url_regex in [
r"/terminals/websocket/(\w+)",
"/api/terminals",
r"/api/terminals/(\w+)",
]:
jp_server_auth_core_resources[url_regex] = "terminals"
return jp_server_auth_core_resources


HTTP_REQUESTS = [
Expand Down

0 comments on commit 644540b

Please sign in to comment.