diff --git a/ibm_cloud_sdk_core/authenticators/iam_authenticator.py b/ibm_cloud_sdk_core/authenticators/iam_authenticator.py index 3aa2f8c..ea781c7 100644 --- a/ibm_cloud_sdk_core/authenticators/iam_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/iam_authenticator.py @@ -45,6 +45,8 @@ class IAMAuthenticator(Authenticator): proxies: Dictionary for mapping request protocol to proxy URL. Defaults to None. proxies.http (optional): The proxy endpoint to use for HTTP requests. proxies.https (optional): The proxy endpoint to use for HTTPS requests. + scope: The "scope" to use when fetching the bearer token from the IAM token server. + This can be used to obtain an access token with a specific scope. Attributes: token_manager (IAMTokenManager): Retrives and manages IAM tokens from the endpoint specified by the url. @@ -62,11 +64,12 @@ def __init__(self, client_secret: Optional[str] = None, disable_ssl_verification: bool = False, headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None) -> None: + proxies: Optional[Dict[str, str]] = None, + scope: Optional[str] = None) -> None: self.token_manager = IAMTokenManager( apikey, url=url, client_id=client_id, client_secret=client_secret, disable_ssl_verification=disable_ssl_verification, - headers=headers, proxies=proxies) + headers=headers, proxies=proxies, scope=scope) self.validate() def validate(self) -> None: @@ -147,3 +150,12 @@ def set_proxies(self, proxies: Dict[str, str]) -> None: proxies.https (optional): The proxy endpoint to use for HTTPS requests. """ self.token_manager.set_proxies(proxies) + + def set_scope(self, value: str) -> None: + """Sets the "scope" parameter to use when fetching the bearer token from the IAM token server. + This can be used to obtain an access token with a specific scope. + + Args: + value: A space seperated string that makes up the scope parameter. + """ + self.token_manager.set_scope(value) diff --git a/ibm_cloud_sdk_core/get_authenticator.py b/ibm_cloud_sdk_core/get_authenticator.py index 97f492f..fc93548 100644 --- a/ibm_cloud_sdk_core/get_authenticator.py +++ b/ibm_cloud_sdk_core/get_authenticator.py @@ -61,7 +61,8 @@ def __construct_authenticator(config: dict) -> Authenticator: url=config.get('AUTH_URL'), client_id=config.get('CLIENT_ID'), client_secret=config.get('CLIENT_SECRET'), - disable_ssl_verification=config.get('AUTH_DISABLE_SSL')) + disable_ssl_verification=config.get('AUTH_DISABLE_SSL'), + scope=config.get('SCOPE')) elif auth_type == 'noauth': authenticator = NoAuthAuthenticator() diff --git a/ibm_cloud_sdk_core/iam_token_manager.py b/ibm_cloud_sdk_core/iam_token_manager.py index c2bcf7c..b00acbd 100644 --- a/ibm_cloud_sdk_core/iam_token_manager.py +++ b/ibm_cloud_sdk_core/iam_token_manager.py @@ -36,6 +36,8 @@ class IAMTokenManager(JWTTokenManager): proxies.http (str): The proxy endpoint to use for HTTP requests. proxies.https (str): The proxy endpoint to use for HTTPS requests. http_config (dict): A dictionary containing values that control the timeout, proxies, and etc of HTTP requests. + scope (str): The "scope" to use when fetching the bearer token from the IAM token server. + This can be used to obtain an access token with a specific scope. Args: apikey: A generated APIKey from ibmcloud. @@ -54,12 +56,15 @@ class IAMTokenManager(JWTTokenManager): proxies: Proxies to use for communicating with IAM. Defaults to None. proxies.http: The proxy endpoint to use for HTTP requests. proxies.https: The proxy endpoint to use for HTTPS requests. + scope: The "scope" to use when fetching the bearer token from the IAM token server. + This can be used to obtain an access token with a specific scope. """ DEFAULT_IAM_URL = 'https://iam.cloud.ibm.com/identity/token' CONTENT_TYPE = 'application/x-www-form-urlencoded' REQUEST_TOKEN_GRANT_TYPE = 'urn:ibm:params:oauth:grant-type:apikey' REQUEST_TOKEN_RESPONSE_TYPE = 'cloud_iam' TOKEN_NAME = 'access_token' + SCOPE = 'scope' def __init__(self, apikey: str, @@ -69,13 +74,15 @@ def __init__(self, client_secret: Optional[str] = None, disable_ssl_verification: bool = False, headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None) -> None: + proxies: Optional[Dict[str, str]] = None, + scope: Optional[str] = None) -> None: self.apikey = apikey self.url = url if url else self.DEFAULT_IAM_URL self.client_id = client_id self.client_secret = client_secret self.headers = headers self.proxies = proxies + self.scope = scope super().__init__( self.url, disable_ssl_verification=disable_ssl_verification, token_name=self.TOKEN_NAME) @@ -101,6 +108,9 @@ def request_token(self) -> dict: 'response_type': self.REQUEST_TOKEN_RESPONSE_TYPE } + if self.scope is not None and self.scope: + data[self.SCOPE] = self.scope + auth_tuple = None # If both the client_id and secret were specified by the user, then use them if self.client_id and self.client_secret: @@ -148,3 +158,11 @@ def set_proxies(self, proxies: Dict[str, str]) -> None: self.proxies = proxies else: raise TypeError('proxies must be a dictionary') + + def set_scope(self, value: str) -> None: + """Sets the "scope" parameter to use when fetching the bearer token from the IAM token server. + + Args: + value: A space seperated string that makes up the scope parameter. + """ + self.scope = value diff --git a/resources/ibm-credentials.env b/resources/ibm-credentials.env index 901504f..d385003 100644 --- a/resources/ibm-credentials.env +++ b/resources/ibm-credentials.env @@ -9,4 +9,13 @@ SERVICE_1_APIKEY=V4HXmoUtMjohnsnow=KotN SERVICE_1_CLIENT_ID=somefake========id SERVICE_1_CLIENT_SECRET===my-client-secret== SERVICE_1_AUTH_URL=https://iamhost/iam/api= -SERVICE_1_URL=service1.com/api \ No newline at end of file +SERVICE_1_URL=service1.com/api + +# Service2 configured with IAM w/scope +SERVICE_2_AUTH_TYPE=iam +SERVICE_2_APIKEY=V4HXmoUtMjohnsnow=KotN +SERVICE_2_CLIENT_ID=somefake========id +SERVICE_2_CLIENT_SECRET===my-client-secret== +SERVICE_2_AUTH_URL=https://iamhost/iam/api= +SERVICE_2_URL=service1.com/api +SERVICE_2_SCOPE=A B C D \ No newline at end of file diff --git a/test/test_iam_authenticator.py b/test/test_iam_authenticator.py index 7f0e914..68ad0eb 100644 --- a/test/test_iam_authenticator.py +++ b/test/test_iam_authenticator.py @@ -9,7 +9,7 @@ def test_iam_authenticator(): - authenticator = IAMAuthenticator('my_apikey') + authenticator = IAMAuthenticator(apikey='my_apikey') assert authenticator is not None assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com/identity/token' assert authenticator.token_manager.client_id is None @@ -18,11 +18,15 @@ def test_iam_authenticator(): assert authenticator.token_manager.headers is None assert authenticator.token_manager.proxies is None assert authenticator.token_manager.apikey == 'my_apikey' + assert authenticator.token_manager.scope is None authenticator.set_client_id_and_secret('tom', 'jerry') assert authenticator.token_manager.client_id == 'tom' assert authenticator.token_manager.client_secret == 'jerry' + authenticator.set_scope('scope1 scope2 scope3') + assert authenticator.token_manager.scope == 'scope1 scope2 scope3' + with pytest.raises(TypeError) as err: authenticator.set_headers('dummy') assert str(err.value) == 'headers must be a dictionary' @@ -37,6 +41,11 @@ def test_iam_authenticator(): authenticator.set_proxies({'dummy': 'proxies'}) assert authenticator.token_manager.proxies == {'dummy': 'proxies'} +def test_iam_authenticator_with_scope(): + authenticator = IAMAuthenticator(apikey='my_apikey', scope='scope1 scope2') + assert authenticator is not None + assert authenticator.token_manager.scope == 'scope1 scope2' + def test_iam_authenticator_validate_failed(): with pytest.raises(ValueError) as err: diff --git a/test/test_iam_token_manager.py b/test/test_iam_token_manager.py index 77791cf..d87cb1c 100644 --- a/test/test_iam_token_manager.py +++ b/test/test_iam_token_manager.py @@ -67,6 +67,29 @@ def test_request_token_auth_in_ctor(): assert responses.calls[0].request.url == iam_url assert responses.calls[0].request.headers['Authorization'] != default_auth_header assert responses.calls[0].response.text == response + assert 'scope' not in responses.calls[0].response.request.body + +@responses.activate +def test_request_token_auth_in_ctor_with_scope(): + iam_url = "https://iam.cloud.ibm.com/identity/token" + response = """{ + "access_token": "oAeisG8yqPY7sFR_x66Z15", + "token_type": "Bearer", + "expires_in": 3600, + "expiration": 1524167011, + "refresh_token": "jy4gl91BQ" + }""" + default_auth_header = 'Basic Yng6Yng=' + responses.add(responses.POST, url=iam_url, body=response, status=200) + + token_manager = IAMTokenManager("apikey", url=iam_url, client_id='foo', client_secret='bar', scope='john snow') + token_manager.request_token() + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == iam_url + assert responses.calls[0].request.headers['Authorization'] != default_auth_header + assert responses.calls[0].response.text == response + assert 'scope=john+snow' in responses.calls[0].response.request.body @responses.activate def test_request_token_unsuccessful(): @@ -119,6 +142,7 @@ def test_request_token_auth_in_ctor_client_id_only(): assert responses.calls[0].request.url == iam_url assert responses.calls[0].request.headers.get('Authorization') is None assert responses.calls[0].response.text == response + assert 'scope' not in responses.calls[0].response.request.body @responses.activate def test_request_token_auth_in_ctor_secret_only(): @@ -139,6 +163,7 @@ def test_request_token_auth_in_ctor_secret_only(): assert responses.calls[0].request.url == iam_url assert responses.calls[0].request.headers.get('Authorization') is None assert responses.calls[0].response.text == response + assert 'scope' not in responses.calls[0].response.request.body @responses.activate def test_request_token_auth_in_setter(): @@ -161,6 +186,7 @@ def test_request_token_auth_in_setter(): assert responses.calls[0].request.url == iam_url assert responses.calls[0].request.headers['Authorization'] != default_auth_header assert responses.calls[0].response.text == response + assert 'scope' not in responses.calls[0].response.request.body @responses.activate def test_request_token_auth_in_setter_client_id_only(): @@ -182,6 +208,7 @@ def test_request_token_auth_in_setter_client_id_only(): assert responses.calls[0].request.url == iam_url assert responses.calls[0].request.headers.get('Authorization') is None assert responses.calls[0].response.text == response + assert 'scope' not in responses.calls[0].response.request.body @responses.activate def test_request_token_auth_in_setter_secret_only(): @@ -204,3 +231,28 @@ def test_request_token_auth_in_setter_secret_only(): assert responses.calls[0].request.url == iam_url assert responses.calls[0].request.headers.get('Authorization') is None assert responses.calls[0].response.text == response + assert 'scope' not in responses.calls[0].response.request.body + +@responses.activate +def test_request_token_auth_in_setter_scope(): + iam_url = "https://iam.cloud.ibm.com/identity/token" + response = """{ + "access_token": "oAeisG8yqPY7sFR_x66Z15", + "token_type": "Bearer", + "expires_in": 3600, + "expiration": 1524167011, + "refresh_token": "jy4gl91BQ" + }""" + responses.add(responses.POST, url=iam_url, body=response, status=200) + + token_manager = IAMTokenManager("iam_apikey") + token_manager.set_client_id_and_secret(None, 'bar') + token_manager.set_headers({'user':'header'}) + token_manager.set_scope('john snow') + token_manager.request_token() + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == iam_url + assert responses.calls[0].request.headers.get('Authorization') is None + assert responses.calls[0].response.text == response + assert 'scope=john+snow' in responses.calls[0].response.request.body diff --git a/test/test_utils.py b/test/test_utils.py index acb8b51..5e968b0 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -145,6 +145,20 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.client_id == 'somefake========id' assert authenticator.token_manager.client_secret == '==my-client-secret==' assert authenticator.token_manager.url == 'https://iamhost/iam/api=' + assert authenticator.token_manager.scope is None + del os.environ['IBM_CREDENTIALS_FILE'] + +def test_get_authenticator_from_credential_file_scope(): + file_path = os.path.join( + os.path.dirname(__file__), '../resources/ibm-credentials.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service_2') + assert authenticator is not None + assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' + assert authenticator.token_manager.client_id == 'somefake========id' + assert authenticator.token_manager.client_secret == '==my-client-secret==' + assert authenticator.token_manager.url == 'https://iamhost/iam/api=' + assert authenticator.token_manager.scope == 'A B C D' del os.environ['IBM_CREDENTIALS_FILE'] def test_get_authenticator_from_env_variables(): @@ -160,6 +174,15 @@ def test_get_authenticator_from_env_variables(): assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' del os.environ['SERVICE_1_APIKEY'] + os.environ['SERVICE_2_APIKEY'] = 'johnsnow' + os.environ['SERVICE_2_SCOPE'] = 'A B C D' + authenticator = get_authenticator_from_environment('service_2') + assert authenticator is not None + assert authenticator.token_manager.apikey == 'johnsnow' + assert authenticator.token_manager.scope == 'A B C D' + del os.environ['SERVICE_2_APIKEY'] + del os.environ['SERVICE_2_SCOPE'] + def test_vcap_credentials(): vcap_services = '{"test":[{"credentials":{ \ "url":"https://gateway.watsonplatform.net/compare-comply/api",\