Skip to content

Commit

Permalink
Merge pull request #599 from lo-simon/check-missing-public-key
Browse files Browse the repository at this point in the history
IS-10 Auth Tests: check missing public key and replaced Auth server with mock
  • Loading branch information
garethsb authored Feb 24, 2023
2 parents 63df1e0 + f5cfc0b commit bf723e8
Show file tree
Hide file tree
Showing 22 changed files with 940 additions and 120 deletions.
32 changes: 22 additions & 10 deletions docs/2.3. Usage - Testing IS-10 Authorization.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
# Testing IS-10 Authorization

When testing IS-10 / BCP-003-02 implementations, there are a number of configuration parameters which need to be set
correctly to match your environment. `ENABLE_AUTH` should be set to `True`, then each of the parameters which begins
`AUTH_` should be considered to ensure it is set correctly.
Most of the test suites can be run in a mode where authorization is required according to IS-10 / BCP-003-02.
This is enabled by setting `ENABLE_AUTH` to `True`.

Each of the specification test suites can be run in authorized mode where applicable. When this is enabled, every request which is made
by the testing tool will include a JSON Web Token which should grant is access to the API. If you receive authorization
errors then this may indicate an issue with the implementation or a configuration error in the testing tool.
In this mode, every request which is made by the testing tool will include a JSON Web Token issued by the testing tool's
mock authorization server, which should grant it access to the API under test.

Test suites which include 'mocks', most notably the IS-04 Node tests (which include a mock registry) also require valid
authorization tokens to be presented to them when `ENABLE_AUTH` is used.
Test suites which include 'mocks', most notably the IS-04 Node API suite (which include a mock registry), also require
valid Access Tokens to be presented to the mock APIs by the OAuth Client (Node or Controller) under test.

Note that whilst the testing tool does not prevent authorization testing from being carried out with `ENABLE_HTTPS`
turned off, this is for debugging purposes only. Production environments must never use authorization without TLS, and
must never share their token generation keys with this testing tool.
set to `False`, this is for debugging purposes only. Production environments must never use authorization without TLS.

There are a number of additional configuration parameters required depending on the OAuth 2.0 options used by the
OAuth Client under test.

* If the OAuth Client uses [`private_key_jwt` client authentication](https://oauth.net/private-key-jwt/),
set `JWKS_URI` to its JSON Web Key Set endpoint.

* If the OAuth Client uses the [Authorization Code Grant](https://oauth.net/2/grant-types/authorization-code/),
the mock Auth server redirects the user-agent back to the client with the authorization code. If the `redirect_uri` is
not provided by the client during the authorization code flow, set `REDIRECT_URI` to the appropriate endpoint.

* If the OAuth Client does not specify its required `scope` when requesting an Access Token, set `SCOPE` to configure
the default scopes issued by the mock authorization server as a space-separated list of scope names, e.g.
"connection node events".
Supported scopes include "connection", "node", "query", "registration", "events", "channelmapping".
35 changes: 19 additions & 16 deletions nmostesting/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,12 @@
# Defaults to the CA contained within this testing tool
CERT_TRUST_ROOT_CA = "test_data/BCP00301/ca/certs/ca.cert.pem"

# certificate authority private key
# Used by the testing tool's mock Auth to generate certificate
KEY_TRUST_ROOT_CA = "test_data/BCP00301/ca/private/ca.key.pem"

# Certificate chains and the corresponding private keys
# Used by the testing tool's mock Node, Registry and System API
# Used by the testing tool's mock Node, Registry, System and Authorization API
CERTS_MOCKS = [
"test_data/BCP00301/ca/intermediate/certs/ecdsa.mocks.testsuite.nmos.tv.cert.chain.pem",
"test_data/BCP00301/ca/intermediate/certs/rsa.mocks.testsuite.nmos.tv.cert.chain.pem"
Expand All @@ -97,24 +101,23 @@
# Test using authorization as per AMWA IS-10 and BCP-003-02
ENABLE_AUTH = False

# Where the Authorization Server is located on the network. Required when 'ENABLE_AUTH' is True
# The hostname must match the CN or SAN in the TLS certificate used by the Authorization Server when combined
# with the DNS_DOMAIN setting
AUTH_SERVER_HOSTNAME = "auth"
AUTH_SERVER_IP = "127.0.0.1"
AUTH_SERVER_PORT = 443
# The following token is set by the application at runtime and should be left as 'None'
AUTH_TOKEN = None

# Which private and public key to use to generate authorization tokens. These must match the keys used by an
# authorization server on the network to ensure that Nodes trust tokens generated from them. DO NOT use production
# keys for testing purposes.
AUTH_TOKEN_PUBKEY = "test_data/BCP00301/ca/intermediate/certs/intermediate.pubkey.pem"
AUTH_TOKEN_PRIVKEY = "test_data/BCP00301/ca/intermediate/private/intermediate.key.pem"
# When testing private_key_jwt OAuth client, mock Auth server uses the jwks_uri to locate the client
# JSON Web Key Set (JWKS) endpoint for the client JWKS to validate the client JWT (client_assertion)
# when fetching the bearer token
# This is used by the /token endpoint and must be set up before test
JWKS_URI = None

# Set the contents of the 'iss' key within generated JSON Web Tokens to match what the network authorization server uses
AUTH_TOKEN_ISSUER = "https://testsuite.nmos.tv"
# When testing Authorization Code Grant OAuth client, mock Auth server redirects the user-agent back to the client
# with the authorization code. This is used by the /authorize endpoint, if no redirect_uri provided by the client
REDIRECT_URI = None

# The following token is set by the application at runtime and should be left as 'None'
AUTH_TOKEN = None
# The scope of the access request, this is used by the /token endpoint, if no scope provided by the client
# Supported scopes are "connection", "node", "query", "registration", "events", "channelmapping"
# Scope is space-separated list of scope names, e.g. "connection node events"
SCOPE = None

# Domain name to use for the local DNS server and mock Node
# This must match the domain name used for certificates in HTTPS mode
Expand Down
7 changes: 4 additions & 3 deletions nmostesting/DNS.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from .TestHelper import get_default_ip
from . import Config as CONFIG
from .mocks.Auth import PRIMARY_AUTH


class ForwardingZoneResolver(ZoneResolver):
Expand Down Expand Up @@ -130,9 +131,9 @@ def reset(self):
if CONFIG.ENABLE_AUTH:
auth_proto = "https" if CONFIG.ENABLE_HTTPS else "http"
extra_services["auth"] = {
"host": CONFIG.AUTH_SERVER_HOSTNAME,
"ip": CONFIG.AUTH_SERVER_IP,
"port": CONFIG.AUTH_SERVER_PORT,
"host": "mocks",
"ip": self.default_ip,
"port": PRIMARY_AUTH.port,
"txt": ["api_ver=v1.0", "api_proto={}".format(auth_proto), "pri=0"]
}
if CONFIG.ENABLE_MQTT_BROKER:
Expand Down
169 changes: 114 additions & 55 deletions nmostesting/GenericTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@
import inspect
import uuid
import time
from authlib.jose import jwt

from . import TestHelper
from .NMOSUtils import NMOSUtils
from .Specification import Specification
from .TestResult import Test
from . import Config as CONFIG
from .mocks.Auth import AuthServer


NMOS_WIKI_URL = "https://github.com/AMWA-TV/nmos/wiki"


def test_depends(func):
"""Decorator to prevent a test being executed in individual mode"""

def invalid(self, test):
if self.test_individual:
test.description = "Invalid"
Expand Down Expand Up @@ -61,7 +62,8 @@ class GenericTest(object):
Generic testing class.
Can be inherited from in order to perform detailed testing.
"""
def __init__(self, apis, omit_paths=None, disable_auto=False):

def __init__(self, apis, omit_paths=None, disable_auto=False, auths=None, **kwargs):
self.apis = apis
self.saved_entities = {}
self.auto_test_count = 0
Expand Down Expand Up @@ -114,6 +116,9 @@ def __init__(self, apis, omit_paths=None, disable_auto=False):

self.result.append(test.NA(""))

self.primary_auth = auths[0] if auths else None
self.secondary_auth = auths[1] if auths else None

def parse_RAML(self):
"""Create a Specification object for each API defined in this object"""
for api in self.apis:
Expand Down Expand Up @@ -196,7 +201,9 @@ def run_tests(self, test_name=["all"]):
# Add 'query' permission when mock registry is disabled and existing network registry is used
if not CONFIG.ENABLE_DNS_SD and "query" not in scopes:
scopes.append("query")
CONFIG.AUTH_TOKEN = self.generate_token(scopes, True)
CONFIG.AUTH_TOKEN = self.primary_auth.generate_token(scopes, True, overrides={
"client_id": str(uuid.uuid4()),
"exp": int(time.time() + 3600)})
if CONFIG.PREVALIDATE_API:
for api in self.apis:
if "raml" not in self.apis[api] or self.apis[api]["url"] is None:
Expand Down Expand Up @@ -261,25 +268,6 @@ def check_CORS(self, method, headers, expect_methods=None, expect_headers=None):
.format(cors_method, headers['Access-Control-Allow-Methods'])
return True, ""

def check_content_type(self, headers, expected_type="application/json"):
"""Check the Content-Type header of an API request or response"""
if "Content-Type" not in headers:
return False, "API failed to signal a Content-Type."
else:
ctype = headers["Content-Type"]
ctype_params = ctype.split(";")
if ctype_params[0] != expected_type:
return False, "API signalled a Content-Type of {} rather than {}." \
.format(ctype, expected_type)
elif ctype_params[0] in ["application/json", "application/sdp"]:
if len(ctype_params) == 2 and ctype_params[1].strip().lower() == "charset=utf-8":
return True, "API signalled an unnecessary 'charset' in its Content-Type: {}" \
.format(ctype)
elif len(ctype_params) >= 2:
return False, "API signalled unexpected additional parameters in its Content-Type: {}" \
.format(ctype)
return True, ""

def check_accept(self, headers):
"""Check the Accept header of an API request"""
if "Accept" in headers:
Expand Down Expand Up @@ -346,7 +334,7 @@ def do_test_base_path(self, api_name, base_url, path, expectation):

def check_response(self, schema, method, response):
"""Confirm that a given Requests response conforms to the expected schema and has any expected headers"""
ctype_valid, ctype_message = self.check_content_type(response.headers)
ctype_valid, ctype_message = TestHelper.check_content_type(response.headers)
if not ctype_valid:
return False, ctype_message

Expand Down Expand Up @@ -420,20 +408,31 @@ def basics(self):
# Test that the API responds with a 4xx when a missing or invalid token is used
results.append(self.do_test_authorization(api, "Missing Authorization Header", error_type=None))
results.append(self.do_test_authorization(api, "Invalid Authorization Token", token=str(uuid.uuid4())))
token = self.generate_token([api], True, overrides={"iat": int(time.time() - 7200),
"exp": int(time.time() - 3600)})
token = self.primary_auth.generate_token(
[api],
True,
overrides={"iat": int(time.time() - 7200),
"exp": int(time.time() - 3600)}) if self.primary_auth else None
results.append(self.do_test_authorization(api, "Expired Authorization Token", token=token))
token = self.generate_token([api], True, overrides={"aud": ["https://*.nmos.example.com"]})
token = self.primary_auth.generate_token(
[api],
True,
overrides={"aud": ["https://*.nmos.example.com"]}) if self.primary_auth else None
results.append(self.do_test_authorization(api, "Incorrect Authorization Audience", error_code=403,
error_type="insufficient_scope", token=token))
token = self.generate_token(["nonsense"], overrides={"x-nmos-nonsense": {"read": [str(uuid.uuid4())]}})
token = self.primary_auth.generate_token(
["nonsense"],
overrides={"x-nmos-nonsense": {"read": [str(uuid.uuid4())]}}) if self.primary_auth else None
results.append(self.do_test_authorization(api, "Incorrect Authorization Scope", error_code=403,
error_type="insufficient_scope", token=token))

# Test that the API responds with a 200 when only the scope is present
token = self.generate_token([api], False, add_claims=False)
token = self.primary_auth.generate_token([api], False, add_claims=False) if self.primary_auth else None
results.append(self.do_test_authorization(api, "Valid Authorization Scope", error_code=200, token=token))

# Test that the API responds with a 401 or 503 followed by 200 when no matching public keys for the token
results.append(self.do_test_no_matching_public_key_authorization(api))

return results

def do_test_404_path(self, api_name):
Expand Down Expand Up @@ -503,6 +502,93 @@ def do_test_authorization(self, api_name, test_name, error_code=401, error_type=
return test.DISABLED("This test is only performed when an API supports Authorization and 'ENABLE_AUTH' "
"is True")

def do_test_no_matching_public_key_authorization(self, api_name):
api = self.apis[api_name]
url = "{}".format(api["url"].rstrip("/"))
test = Test("GET /x-nmos/{}/{} (No Matching Public Keys)".format(api_name,
api["version"]), self.auto_test_name(api_name))

if self.authorization:
# start the mock secondary Authorization server
secondary_authorization_server = AuthServer(self.secondary_auth)
secondary_authorization_server.start()

# generate token
token = self.secondary_auth.generate_token([api_name])

fail = None
warning = None
headers = {"Authorization": "Bearer {}".format(token)}
valid, response = self.do_request("GET", url, headers=headers)

if not valid:
fail = response
elif response.status_code != 401 and response.status_code != 503:
fail = "Incorrect response code, expected 401 or 503. Received {}".format(response.status_code)
if "WWW-Authenticate" not in response.headers:
fail = "Authorization error responses must include a 'WWW-Authenticate' header"
elif not response.headers["WWW-Authenticate"].startswith("Bearer "):
fail = "'WWW-Authenticate' response header must begin 'Bearer'"
else:
error_code = response.status_code
valid, message = self.check_error_response("GET", response, error_code)
if not valid:
fail = message

# if node responds with 401, node should attempt to obtain the missing public key via the the token
# iss claim, as specified in RFC 8414 section 3.
# https://specs.amwa.tv/is-10/releases/v1.0.0/docs/4.5._Behaviour_-_Resource_Servers.html#public-keys
elif response.status_code == 401:
warning = "should attempt to obtain the missing public key via the the token iss claim"

error_type = "invalid_token"
# Remove 'Bearer ' and tokenise
# https://tools.ietf.org/html/rfc6750#section-3
error_header_ok = False
auth_params = response.headers["WWW-Authenticate"][7:].split(",")
for param in auth_params:
param_parts = param.split("=")
if param_parts[0] == "error":
if param_parts[1] == error_type:
error_header_ok = True
if not error_header_ok:
warning = "'WWW-Authenticate' response header should contain 'error={}'".format(error_type)

# if node responds with 503 retry GET the Retry-After value
# https://specs.amwa.tv/is-10/releases/v1.0.0/docs/4.5._Behaviour_-_Resource_Servers.html#public-keys
elif response.status_code == 503:
# get retry-after from response
retry_after = response.headers.get("Retry-After")
if retry_after is None:
warning = "503 'Service Unavailable' response should include a 'Retry-After' header"
else:
delay = int(retry_after)
print(" * Waiting {}s before retrying \"GET {}\"".format(delay, url))
time.sleep(delay)

# do retry GET
valid, response = self.do_request("GET", url, headers=headers)
if valid:
if response.status_code != 200:
fail = "Unexpected response code after retry, expected 200. Received {}".format(
response.status_code)
else:
fail = response

# shutdown the mock secondary Authorization server
secondary_authorization_server.shutdown()

if fail:
return test.FAIL(fail)
elif warning:
return test.WARNING(warning)
else:
return test.PASS()

else:
return test.DISABLED("This test is only performed when an API supports Authorization and 'ENABLE_AUTH' "
"is True")

def do_test_api_resource(self, resource, response_code, api):
test = Test("{} /x-nmos/{}/{}{}".format(resource[1]['method'].upper(),
api,
Expand Down Expand Up @@ -622,30 +708,3 @@ def get_schema(self, api_name, method, path, status_code):
else:
raise
return schema

def generate_token(self, scopes=None, write=False, azp=False, add_claims=True, overrides=None):
if scopes is None:
scopes = []
header = {"typ": "JWT", "alg": "RS512"}
payload = {"iss": "{}".format(CONFIG.AUTH_TOKEN_ISSUER),
"sub": "[email protected]",
"aud": ["https://*.{}".format(CONFIG.DNS_DOMAIN), "https://*.local"],
"exp": int(time.time() + 3600),
"iat": int(time.time()),
"scope": " ".join(scopes)}
if azp:
payload["azp"] = str(uuid.uuid4())
else:
payload["client_id"] = str(uuid.uuid4())
nmos_claims = {}
if add_claims:
for api in scopes:
nmos_claims["x-nmos-{}".format(api)] = {"read": ["*"]}
if write:
nmos_claims["x-nmos-{}".format(api)]["write"] = ["*"]
payload.update(nmos_claims)
if overrides:
payload.update(overrides)
key = open(CONFIG.AUTH_TOKEN_PRIVKEY).read()
token = jwt.encode(header, payload, key).decode()
return token
Loading

0 comments on commit bf723e8

Please sign in to comment.