From a86cdab3986aee607cba5a91565dd48c3f40d372 Mon Sep 17 00:00:00 2001 From: Norbert Biczo Date: Thu, 17 Nov 2022 15:56:40 +0100 Subject: [PATCH] chore: introduce Black code formatter (#150) This commit adds the [Black](https://github.com/psf/black) code formatter to the project and updates the `Makefile` to use it. There are 2 targets that run the `black` tool: - `make lint`: this will check the code with both `pylint` and `black` - `make lint-fix`: this will reformat the code using `black`. Initial code formatting is also included. Signed-off-by: Norbert Biczo --- CONTRIBUTING.md | 1 + Makefile | 5 +- ibm_cloud_sdk_core/api_exception.py | 2 +- .../authenticators/basic_authenticator.py | 9 +- .../authenticators/container_authenticator.py | 41 ++- .../authenticators/cp4d_authenticator.py | 52 +-- .../authenticators/iam_authenticator.py | 35 +- .../iam_request_based_authenticator.py | 10 +- .../vpc_instance_authenticator.py | 14 +- ibm_cloud_sdk_core/base_service.py | 115 +++--- ibm_cloud_sdk_core/detailed_response.py | 13 +- ibm_cloud_sdk_core/get_authenticator.py | 38 +- .../token_managers/container_token_manager.py | 41 ++- .../token_managers/cp4d_token_manager.py | 38 +- .../iam_request_based_token_manager.py | 34 +- .../token_managers/iam_token_manager.py | 32 +- .../token_managers/jwt_token_manager.py | 19 +- .../token_managers/token_manager.py | 26 +- .../vpc_instance_token_manager.py | 21 +- ibm_cloud_sdk_core/utils.py | 63 ++-- pyproject.toml | 3 + requirements-dev.txt | 1 + setup.py | 66 ++-- test/test_api_exception.py | 97 ++--- test/test_base_service.py | 331 ++++++------------ test/test_basic_authenticator.py | 12 +- test/test_container_authenticator.py | 27 +- test/test_container_token_manager.py | 49 +-- test/test_cp4d_authenticator.py | 76 ++-- test/test_cp4d_token_manager.py | 13 +- test/test_detailed_response.py | 46 +-- test/test_iam_authenticator.py | 38 +- test/test_iam_token_manager.py | 26 +- test/test_jwt_token_manager.py | 45 ++- test/test_token_manager.py | 16 +- test/test_utils.py | 91 ++--- test/test_vpc_instance_authenticator.py | 16 +- test/test_vpc_instance_token_manager.py | 87 +++-- .../test_cp4d_authenticator_integration.py | 6 +- 39 files changed, 779 insertions(+), 876 deletions(-) create mode 100644 pyproject.toml diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7eb2a2d..3342022 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,6 +36,7 @@ If you want to contribute to the repository, here's a quick guide: * Check for unnecessary whitespace with `git diff --check` before committing. * Make sure your code supports Python 3.7, 3.8, 3.9 and 3.10. You can use `pyenv` and `tox` for this 1. Make the test pass + * Linting errors can be fixed by running `make lint-fix` in most cases 1. Check code coverage. Add tests for all new functionality and ensure overall coverage does not decrease. 1. Commit your changes * Commits should follow the [Angular commit message guidelines](https://github.com/angular/angular/blob/master/CONTRIBUTING.md#-commit-message-guidelines). This is because our release tool uses this format for determining release versions and generating changelogs. To make this easier, we recommend using the [Commitizen CLI](https://github.com/commitizen/cz-cli) with the `cz-conventional-changelog` adapter. diff --git a/Makefile b/Makefile index ff802a3..ba9bee5 100644 --- a/Makefile +++ b/Makefile @@ -24,4 +24,7 @@ test-unit: python -m pytest --cov=ibm_cloud_sdk_core test lint: - ./pylint.sh + ./pylint.sh && black --check . + +lint-fix: + black . diff --git a/ibm_cloud_sdk_core/api_exception.py b/ibm_cloud_sdk_core/api_exception.py index 3cbe618..0c38a76 100644 --- a/ibm_cloud_sdk_core/api_exception.py +++ b/ibm_cloud_sdk_core/api_exception.py @@ -50,7 +50,7 @@ def __str__(self) -> str: msg = 'Error: ' + str(self.message) + ', Code: ' + str(self.code) if self.global_transaction_id is not None: msg += ' , X-global-transaction-id: ' + str(self.global_transaction_id) - return msg + return msg @staticmethod def _get_error_message(response: Response) -> str: diff --git a/ibm_cloud_sdk_core/authenticators/basic_authenticator.py b/ibm_cloud_sdk_core/authenticators/basic_authenticator.py index 335ed10..a14141c 100644 --- a/ibm_cloud_sdk_core/authenticators/basic_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/basic_authenticator.py @@ -57,16 +57,15 @@ def validate(self) -> None: if self.username is None or self.password is None: raise ValueError('The username and password shouldn\'t be None.') - if has_bad_first_or_last_char( - self.username) or has_bad_first_or_last_char(self.password): + if has_bad_first_or_last_char(self.username) or has_bad_first_or_last_char(self.password): raise ValueError( 'The username and password shouldn\'t start or end with curly brackets or quotes. ' - 'Please remove any surrounding {, }, or \" characters.') + 'Please remove any surrounding {, }, or \" characters.' + ) def __construct_basic_auth_header(self) -> str: authstring = "{0}:{1}".format(self.username, self.password) - base64_authorization = base64.b64encode( - authstring.encode('utf-8')).decode('utf-8') + base64_authorization = base64.b64encode(authstring.encode('utf-8')).decode('utf-8') return 'Basic {0}'.format(base64_authorization) def authenticate(self, req: Request) -> None: diff --git a/ibm_cloud_sdk_core/authenticators/container_authenticator.py b/ibm_cloud_sdk_core/authenticators/container_authenticator.py index 2f60b41..534dbd9 100644 --- a/ibm_cloud_sdk_core/authenticators/container_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/container_authenticator.py @@ -64,25 +64,35 @@ class ContainerAuthenticator(IAMRequestBasedAuthenticator): or client_id, and/or client_secret are not valid for IAM token requests. """ - def __init__(self, - cr_token_filename: Optional[str] = None, - iam_profile_name: Optional[str] = None, - iam_profile_id: Optional[str] = None, - url: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, - disable_ssl_verification: bool = False, - scope: Optional[str] = None, - proxies: Optional[Dict[str, str]] = None, - headers: Optional[Dict[str, str]] = None) -> None: + def __init__( + self, + cr_token_filename: Optional[str] = None, + iam_profile_name: Optional[str] = None, + iam_profile_id: Optional[str] = None, + url: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + disable_ssl_verification: bool = False, + scope: Optional[str] = None, + proxies: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> None: # Check the type of `disable_ssl_verification`. Must be a bool. if not isinstance(disable_ssl_verification, bool): raise TypeError('disable_ssl_verification must be a bool') self.token_manager = ContainerTokenManager( - cr_token_filename=cr_token_filename, iam_profile_name=iam_profile_name, iam_profile_id=iam_profile_id, - url=url, client_id=client_id, client_secret=client_secret, - disable_ssl_verification=disable_ssl_verification, scope=scope, proxies=proxies, headers=headers) + cr_token_filename=cr_token_filename, + iam_profile_name=iam_profile_name, + iam_profile_id=iam_profile_id, + url=url, + client_id=client_id, + client_secret=client_secret, + disable_ssl_verification=disable_ssl_verification, + scope=scope, + proxies=proxies, + headers=headers, + ) self.validate() @@ -103,8 +113,7 @@ def validate(self) -> None: super().validate() if not self.token_manager.iam_profile_name and not self.token_manager.iam_profile_id: - raise ValueError( - 'At least one of iam_profile_name or iam_profile_id must be specified.') + raise ValueError('At least one of iam_profile_name or iam_profile_id must be specified.') def set_cr_token_filename(self, cr_token_filename: str) -> None: """Set the location of the compute resource token on the local filesystem. diff --git a/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py b/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py index 30dd5fb..785b1c0 100644 --- a/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/cp4d_authenticator.py @@ -52,23 +52,32 @@ class CloudPakForDataAuthenticator(Authenticator): ValueError: The username, password/apikey, and/or url are not valid for CP4D token requests. """ - def __init__(self, - username: str = None, - password: str = None, - url: str = None, - *, - apikey: str = None, - disable_ssl_verification: bool = False, - headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None, - verify: Optional[str] = None) -> None: + def __init__( + self, + username: str = None, + password: str = None, + url: str = None, + *, + apikey: str = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + verify: Optional[str] = None + ) -> None: # Check the type of `disable_ssl_verification`. Must be a bool. if not isinstance(disable_ssl_verification, bool): raise TypeError('disable_ssl_verification must be a bool') self.token_manager = CP4DTokenManager( - username=username, password=password, apikey=apikey, url=url, - disable_ssl_verification=disable_ssl_verification, headers=headers, proxies=proxies, verify=verify) + username=username, + password=password, + apikey=apikey, + url=url, + disable_ssl_verification=disable_ssl_verification, + headers=headers, + proxies=proxies, + verify=verify, + ) self.validate() @@ -88,24 +97,27 @@ def validate(self) -> None: if self.token_manager.username is None: raise ValueError('The username shouldn\'t be None.') - if ((self.token_manager.password is None and self.token_manager.apikey is None) - or (self.token_manager.password is not None and self.token_manager.apikey is not None)): - raise ValueError( - 'Exactly one of `apikey` or `password` must be specified.') + if (self.token_manager.password is None and self.token_manager.apikey is None) or ( + self.token_manager.password is not None and self.token_manager.apikey is not None + ): + raise ValueError('Exactly one of `apikey` or `password` must be specified.') if self.token_manager.url is None: raise ValueError('The url shouldn\'t be None.') - if has_bad_first_or_last_char( - self.token_manager.username) or has_bad_first_or_last_char(self.token_manager.password): + if has_bad_first_or_last_char(self.token_manager.username) or has_bad_first_or_last_char( + self.token_manager.password + ): raise ValueError( 'The username and password shouldn\'t start or end with curly brackets or quotes. ' - 'Please remove any surrounding {, }, or \" characters.') + 'Please remove any surrounding {, }, or \" characters.' + ) if has_bad_first_or_last_char(self.token_manager.url): raise ValueError( 'The url shouldn\'t start or end with curly brackets or quotes. ' - 'Please remove any surrounding {, }, or \" characters.') + 'Please remove any surrounding {, }, or \" characters.' + ) def authenticate(self, req: Request) -> None: """Adds CP4D authentication information to the request. diff --git a/ibm_cloud_sdk_core/authenticators/iam_authenticator.py b/ibm_cloud_sdk_core/authenticators/iam_authenticator.py index 4286020..dac36fb 100644 --- a/ibm_cloud_sdk_core/authenticators/iam_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/iam_authenticator.py @@ -56,24 +56,32 @@ class IAMAuthenticator(IAMRequestBasedAuthenticator): ValueError: The apikey, client_id, and/or client_secret are not valid for IAM token requests. """ - def __init__(self, - apikey: str, - *, - url: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, - disable_ssl_verification: bool = False, - headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None, - scope: Optional[str] = None) -> None: + def __init__( + self, + apikey: str, + *, + url: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + scope: Optional[str] = None + ) -> None: # Check the type of `disable_ssl_verification`. Must be a bool. if not isinstance(disable_ssl_verification, bool): raise TypeError('disable_ssl_verification must be a bool') self.token_manager = IAMTokenManager( - apikey, url=url, client_id=client_id, client_secret=client_secret, + apikey, + url=url, + client_id=client_id, + client_secret=client_secret, disable_ssl_verification=disable_ssl_verification, - headers=headers, proxies=proxies, scope=scope) + headers=headers, + proxies=proxies, + scope=scope, + ) self.validate() @@ -98,4 +106,5 @@ def validate(self) -> None: if has_bad_first_or_last_char(self.token_manager.apikey): raise ValueError( 'The apikey shouldn\'t start or end with curly brackets or quotes. ' - 'Please remove any surrounding {, }, or \" characters.') + 'Please remove any surrounding {, }, or \" characters.' + ) diff --git a/ibm_cloud_sdk_core/authenticators/iam_request_based_authenticator.py b/ibm_cloud_sdk_core/authenticators/iam_request_based_authenticator.py index 67e3f2a..1a56b54 100644 --- a/ibm_cloud_sdk_core/authenticators/iam_request_based_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/iam_request_based_authenticator.py @@ -41,12 +41,10 @@ def validate(self) -> None: Raises: ValueError: The client_id, and/or client_secret are not valid for IAM token requests. """ - if (self.token_manager.client_id and - not self.token_manager.client_secret) or ( - not self.token_manager.client_id and - self.token_manager.client_secret): - raise ValueError( - 'Both client_id and client_secret should be initialized.') + if (self.token_manager.client_id and not self.token_manager.client_secret) or ( + not self.token_manager.client_id and self.token_manager.client_secret + ): + raise ValueError('Both client_id and client_secret should be initialized.') def authenticate(self, req: Request) -> None: """Adds IAM authentication information to the request. diff --git a/ibm_cloud_sdk_core/authenticators/vpc_instance_authenticator.py b/ibm_cloud_sdk_core/authenticators/vpc_instance_authenticator.py index 82ca50f..16042ba 100644 --- a/ibm_cloud_sdk_core/authenticators/vpc_instance_authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/vpc_instance_authenticator.py @@ -53,16 +53,16 @@ class VPCInstanceAuthenticator(Authenticator): DEFAULT_IMS_ENDPOINT = 'http://169.254.169.254' - def __init__(self, - iam_profile_crn: Optional[str] = None, - iam_profile_id: Optional[str] = None, - url: Optional[str] = None) -> None: + def __init__( + self, iam_profile_crn: Optional[str] = None, iam_profile_id: Optional[str] = None, url: Optional[str] = None + ) -> None: if not url: url = self.DEFAULT_IMS_ENDPOINT self.token_manager = VPCInstanceTokenManager( - url=url, iam_profile_crn=iam_profile_crn, iam_profile_id=iam_profile_id) + url=url, iam_profile_crn=iam_profile_crn, iam_profile_id=iam_profile_id + ) self.validate() @@ -74,8 +74,7 @@ def validate(self) -> None: super().validate() if self.token_manager.iam_profile_crn and self.token_manager.iam_profile_id: - raise ValueError( - 'At most one of "iam_profile_id" or "iam_profile_crn" may be specified.') + raise ValueError('At most one of "iam_profile_id" or "iam_profile_crn" may be specified.') def authenticate(self, req: Request) -> None: """Adds IAM authentication information to the request. @@ -92,7 +91,6 @@ def authenticate(self, req: Request) -> None: bearer_token = self.token_manager.get_token() headers['Authorization'] = 'Bearer {0}'.format(bearer_token) - def set_iam_profile_crn(self, iam_profile_crn: str) -> None: """Sets CRN of the IAM profile. diff --git a/ibm_cloud_sdk_core/base_service.py b/ibm_cloud_sdk_core/base_service.py index a9ace31..f279a0b 100644 --- a/ibm_cloud_sdk_core/base_service.py +++ b/ibm_cloud_sdk_core/base_service.py @@ -30,9 +30,14 @@ from .api_exception import ApiException from .detailed_response import DetailedResponse from .token_managers.token_manager import TokenManager -from .utils import (has_bad_first_or_last_char, remove_null_values, - cleanup_values, read_external_sources, strip_extra_slashes, - SSLHTTPAdapter) +from .utils import ( + has_bad_first_or_last_char, + remove_null_values, + cleanup_values, + read_external_sources, + strip_extra_slashes, + SSLHTTPAdapter, +) from .version import __version__ # Uncomment this to enable http debugging @@ -43,8 +48,8 @@ logger = logging.getLogger(__name__) -#pylint: disable=too-many-instance-attributes -#pylint: disable=too-many-locals +# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-locals class BaseService: """Common functionality shared by generated service classes. @@ -72,18 +77,23 @@ class BaseService: Raises: ValueError: If Authenticator is not provided or invalid type. """ + SDK_NAME = 'ibm-python-sdk-core' - ERROR_MSG_DISABLE_SSL = 'The connection failed because the SSL certificate is not valid. To use a self-signed '\ - 'certificate, disable verification of the server\'s SSL certificate by invoking the '\ - 'set_disable_ssl_verification(True) on your service instance and/ or use the '\ - 'disable_ssl_verification option of the authenticator.' - - def __init__(self, - *, - service_url: str = None, - authenticator: Authenticator = None, - disable_ssl_verification: bool = False, - enable_gzip_compression: bool = False) -> None: + ERROR_MSG_DISABLE_SSL = ( + 'The connection failed because the SSL certificate is not valid. To use a self-signed ' + 'certificate, disable verification of the server\'s SSL certificate by invoking the ' + 'set_disable_ssl_verification(True) on your service instance and/ or use the ' + 'disable_ssl_verification option of the authenticator.' + ) + + def __init__( + self, + *, + service_url: str = None, + authenticator: Authenticator = None, + disable_ssl_verification: bool = False, + enable_gzip_compression: bool = False + ) -> None: self.set_service_url(service_url) self.http_client = requests.Session() self.http_config = {} @@ -121,8 +131,7 @@ def enable_retries(self, max_retries: int = 4, retry_interval: float = 1.0) -> N status_forcelist=[429, 500, 502, 503, 504], # List of HTTP methods to retry on # Omitting this will default to all methods except POST - allowed_methods=['HEAD', 'GET', 'PUT', - 'DELETE', 'OPTIONS', 'TRACE', 'POST'] + allowed_methods=['HEAD', 'GET', 'PUT', 'DELETE', 'OPTIONS', 'TRACE', 'POST'], ) self.http_adapter = SSLHTTPAdapter(max_retries=self.retry_config) self.http_client.mount('http://', self.http_adapter) @@ -138,14 +147,11 @@ def disable_retries(self): @staticmethod def _get_system_info() -> str: return '{0} {1} {2}'.format( - platform.system(), # OS - platform.release(), # OS version - platform.python_version() # Python version + platform.system(), platform.release(), platform.python_version() # OS # OS version # Python version ) def _build_user_agent(self) -> str: - return '{0}-{1} {2}'.format(self.SDK_NAME, __version__, - self._get_system_info()) + return '{0}-{1} {2}'.format(self.SDK_NAME, __version__, self._get_system_info()) def configure_service(self, service_name: str) -> None: """Look for external configuration of a service. Set service properties. @@ -168,19 +174,16 @@ def configure_service(self, service_name: str) -> None: if config.get('URL'): self.set_service_url(config.get('URL')) if config.get('DISABLE_SSL'): - self.set_disable_ssl_verification( - config.get('DISABLE_SSL').lower() == 'true') + self.set_disable_ssl_verification(config.get('DISABLE_SSL').lower() == 'true') if config.get('ENABLE_GZIP'): - self.set_enable_gzip_compression( - config.get('ENABLE_GZIP').lower() == 'true') + self.set_enable_gzip_compression(config.get('ENABLE_GZIP').lower() == 'true') if config.get('ENABLE_RETRIES'): if config.get('ENABLE_RETRIES').lower() == 'true': kwargs = {} if config.get('MAX_RETRIES'): kwargs["max_retries"] = int(config.get('MAX_RETRIES')) if config.get('RETRY_INTERVAL'): - kwargs["retry_interval"] = float( - config.get('RETRY_INTERVAL')) + kwargs["retry_interval"] = float(config.get('RETRY_INTERVAL')) self.enable_retries(**kwargs) def _set_user_agent_header(self, user_agent_string: str) -> None: @@ -199,10 +202,11 @@ def set_http_config(self, http_config: dict) -> None: """ if isinstance(http_config, dict): self.http_config = http_config - if (self.authenticator - and hasattr(self.authenticator, 'token_manager') - and isinstance(self.authenticator.token_manager, - TokenManager)): + if ( + self.authenticator + and hasattr(self.authenticator, 'token_manager') + and isinstance(self.authenticator.token_manager, TokenManager) + ): self.authenticator.token_manager.http_config = http_config else: raise TypeError("http_config parameter must be a dictionary") @@ -251,8 +255,7 @@ def set_http_client(self, http_client: requests.sessions.Session) -> None: if isinstance(http_client, requests.sessions.Session): self.http_client = http_client else: - raise TypeError( - "http_client parameter must be a requests.sessions.Session") + raise TypeError("http_client parameter must be a requests.sessions.Session") def get_authenticator(self) -> Authenticator: """Get the authenticator currently used by the service. @@ -305,9 +308,7 @@ def send(self, request: requests.Request, **kwargs) -> DetailedResponse: if key not in silent_keys: logger.warning('"%s" has been removed from the request', key) try: - response = self.http_client.request(**request, - cookies=self.jar, - **kwargs) + response = self.http_client.request(**request, cookies=self.jar, **kwargs) if 200 <= response.status_code <= 299: if response.status_code == 204 or request['method'] == 'HEAD': @@ -322,18 +323,14 @@ def send(self, request: requests.Request, **kwargs) -> DetailedResponse: result = response.json() except: result = response - return DetailedResponse(response=result, - headers=response.headers, - status_code=response.status_code) + return DetailedResponse(response=result, headers=response.headers, status_code=response.status_code) raise ApiException(response.status_code, http_response=response) except requests.exceptions.SSLError: logger.exception(self.ERROR_MSG_DISABLE_SSL) raise - def set_enable_gzip_compression(self, - should_enable_compression: bool = False - ) -> None: + def set_enable_gzip_compression(self, should_enable_compression: bool = False) -> None: """Set value to enable gzip compression on request bodies""" self.enable_gzip_compression = should_enable_compression @@ -341,18 +338,17 @@ def get_enable_gzip_compression(self) -> bool: """Get value for enabling gzip compression on request bodies""" return self.enable_gzip_compression - def prepare_request(self, - method: str, - url: str, - *, - headers: Optional[dict] = None, - params: Optional[dict] = None, - data: Optional[Union[str, dict]] = None, - files: Optional[Union[Dict[str, Tuple[str]], - List[Tuple[str, - Tuple[str, - ...]]]]] = None, - **kwargs) -> dict: + def prepare_request( + self, + method: str, + url: str, + *, + headers: Optional[dict] = None, + params: Optional[dict] = None, + data: Optional[Union[str, dict]] = None, + files: Optional[Union[Dict[str, Tuple[str]], List[Tuple[str, Tuple[str, ...]]]]] = None, + **kwargs + ) -> dict: """Build a dict that represents an HTTP service request. Clean up headers, add default http configuration, convert data @@ -409,9 +405,7 @@ def prepare_request(self, self.authenticator.authenticate(request) # Compress the request body if applicable - if (self.get_enable_gzip_compression() - and 'content-encoding' not in headers - and request['data'] is not None): + if self.get_enable_gzip_compression() and 'content-encoding' not in headers and request['data'] is not None: headers['content-encoding'] = 'gzip' uncompressed_data = request['data'] request_body = gzip.compress(uncompressed_data) @@ -431,8 +425,7 @@ def prepare_request(self, files = files.items() # Next, fill in any missing filenames from file tuples. for part_name, file_tuple in files: - if file_tuple and len( - file_tuple) == 3 and file_tuple[0] is None: + if file_tuple and len(file_tuple) == 3 and file_tuple[0] is None: file = file_tuple[1] if file and hasattr(file, 'name'): filename = basename(file.name) diff --git a/ibm_cloud_sdk_core/detailed_response.py b/ibm_cloud_sdk_core/detailed_response.py index 5bf76be..1eb94e8 100644 --- a/ibm_cloud_sdk_core/detailed_response.py +++ b/ibm_cloud_sdk_core/detailed_response.py @@ -34,11 +34,14 @@ class DetailedResponse: status_code (int): The status code of the response. """ - def __init__(self, - *, - response: Optional[Union[dict, requests.Response]] = None, - headers: Optional[Dict[str, str]] = None, - status_code: Optional[int] = None) -> None: + + def __init__( + self, + *, + response: Optional[Union[dict, requests.Response]] = None, + headers: Optional[Dict[str, str]] = None, + status_code: Optional[int] = None + ) -> None: self.result = response self.headers = headers self.status_code = status_code diff --git a/ibm_cloud_sdk_core/get_authenticator.py b/ibm_cloud_sdk_core/get_authenticator.py index 8b8bcb1..6f7c992 100644 --- a/ibm_cloud_sdk_core/get_authenticator.py +++ b/ibm_cloud_sdk_core/get_authenticator.py @@ -14,9 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .authenticators import (Authenticator, BasicAuthenticator, BearerTokenAuthenticator, ContainerAuthenticator, - CloudPakForDataAuthenticator, IAMAuthenticator, NoAuthAuthenticator, - VPCInstanceAuthenticator) +from .authenticators import ( + Authenticator, + BasicAuthenticator, + BearerTokenAuthenticator, + ContainerAuthenticator, + CloudPakForDataAuthenticator, + IAMAuthenticator, + NoAuthAuthenticator, + VPCInstanceAuthenticator, +) from .utils import read_external_sources @@ -59,12 +66,9 @@ def __construct_authenticator(config: dict) -> Authenticator: authenticator = None if auth_type == Authenticator.AUTHTYPE_BASIC.lower(): - authenticator = BasicAuthenticator( - username=config.get('USERNAME'), - password=config.get('PASSWORD')) + authenticator = BasicAuthenticator(username=config.get('USERNAME'), password=config.get('PASSWORD')) elif auth_type == Authenticator.AUTHTYPE_BEARERTOKEN.lower(): - authenticator = BearerTokenAuthenticator( - bearer_token=config.get('BEARER_TOKEN')) + authenticator = BearerTokenAuthenticator(bearer_token=config.get('BEARER_TOKEN')) elif auth_type == Authenticator.AUTHTYPE_CONTAINER.lower(): authenticator = ContainerAuthenticator( cr_token_filename=config.get('CR_TOKEN_FILENAME'), @@ -73,30 +77,32 @@ def __construct_authenticator(config: dict) -> Authenticator: url=config.get('AUTH_URL'), client_id=config.get('CLIENT_ID'), client_secret=config.get('CLIENT_SECRET'), - disable_ssl_verification=config.get( - 'AUTH_DISABLE_SSL', 'false').lower() == 'true', - scope=config.get('SCOPE')) + disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + scope=config.get('SCOPE'), + ) elif auth_type == Authenticator.AUTHTYPE_CP4D.lower(): authenticator = CloudPakForDataAuthenticator( username=config.get('USERNAME'), password=config.get('PASSWORD'), url=config.get('AUTH_URL'), apikey=config.get('APIKEY'), - disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true') + disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + ) elif auth_type == Authenticator.AUTHTYPE_IAM.lower() and config.get('APIKEY'): authenticator = IAMAuthenticator( apikey=config.get('APIKEY'), url=config.get('AUTH_URL'), client_id=config.get('CLIENT_ID'), client_secret=config.get('CLIENT_SECRET'), - disable_ssl_verification=config.get( - 'AUTH_DISABLE_SSL', 'false').lower() == 'true', - scope=config.get('SCOPE')) + disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + scope=config.get('SCOPE'), + ) elif auth_type == Authenticator.AUTHTYPE_VPC.lower(): authenticator = VPCInstanceAuthenticator( iam_profile_crn=config.get('IAM_PROFILE_CRN'), iam_profile_id=config.get('IAM_PROFILE_ID'), - url=config.get('AUTH_URL')) + url=config.get('AUTH_URL'), + ) elif auth_type == Authenticator.AUTHTYPE_NOAUTH.lower(): authenticator = NoAuthAuthenticator() diff --git a/ibm_cloud_sdk_core/token_managers/container_token_manager.py b/ibm_cloud_sdk_core/token_managers/container_token_manager.py index ec81bbe..29e9813 100644 --- a/ibm_cloud_sdk_core/token_managers/container_token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/container_token_manager.py @@ -79,22 +79,31 @@ class ContainerTokenManager(IAMRequestBasedTokenManager): scope: The "scope" to use when fetching the bearer token from the IAM token server. This can be used to obtain an access token with a specific scope. """ + DEFAULT_CR_TOKEN_FILENAME = '/var/run/secrets/tokens/vault-token' - def __init__(self, - cr_token_filename: Optional[str] = None, - iam_profile_name: Optional[str] = None, - iam_profile_id: Optional[str] = None, - url: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, - disable_ssl_verification: bool = False, - scope: Optional[str] = None, - proxies: Optional[Dict[str, str]] = None, - headers: Optional[Dict[str, str]] = None) -> None: + def __init__( + self, + cr_token_filename: Optional[str] = None, + iam_profile_name: Optional[str] = None, + iam_profile_id: Optional[str] = None, + url: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + disable_ssl_verification: bool = False, + scope: Optional[str] = None, + proxies: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> None: super().__init__( - url=url, client_id=client_id, client_secret=client_secret, - disable_ssl_verification=disable_ssl_verification, headers=headers, proxies=proxies, scope=scope) + url=url, + client_id=client_id, + client_secret=client_secret, + disable_ssl_verification=disable_ssl_verification, + headers=headers, + proxies=proxies, + scope=scope, + ) self.cr_token_filename = cr_token_filename self.iam_profile_name = iam_profile_name @@ -113,8 +122,7 @@ def retrieve_cr_token(self) -> str: """ cr_token_filename = self.cr_token_filename if self.cr_token_filename else self.DEFAULT_CR_TOKEN_FILENAME - logger.debug('Attempting to read CR token from file: %s', - cr_token_filename) + logger.debug('Attempting to read CR token from file: %s', cr_token_filename) try: with open(cr_token_filename, 'r', encoding='utf-8') as file: @@ -123,7 +131,8 @@ def retrieve_cr_token(self) -> str: # pylint: disable=broad-except except Exception as ex: raise Exception( - 'Unable to retrieve the CR token value from file {}: {}'.format(cr_token_filename, ex)) from None + 'Unable to retrieve the CR token value from file {}: {}'.format(cr_token_filename, ex) + ) from None def request_token(self) -> dict: """Retrieves a CR token value from the current compute resource, diff --git a/ibm_cloud_sdk_core/token_managers/cp4d_token_manager.py b/ibm_cloud_sdk_core/token_managers/cp4d_token_manager.py index 8b2b082..88ebece 100644 --- a/ibm_cloud_sdk_core/token_managers/cp4d_token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/cp4d_token_manager.py @@ -48,19 +48,22 @@ class CP4DTokenManager(JWTTokenManager): proxies.https (str): The proxy endpoint to use for HTTPS requests. verify (str): The path to the certificate to use for HTTPS requests. """ + TOKEN_NAME = 'token' VALIDATE_AUTH_PATH = '/v1/authorize' - def __init__(self, - username: str = None, - password: str = None, - url: str = None, - *, - apikey: str = None, - disable_ssl_verification: bool = False, - headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None, - verify: Optional[str] = None) -> None: + def __init__( + self, + username: str = None, + password: str = None, + url: str = None, + *, + apikey: str = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + verify: Optional[str] = None + ) -> None: self.username = username self.password = password self.verify = verify @@ -72,23 +75,18 @@ def __init__(self, self.headers = {} self.headers['Content-Type'] = 'application/json' self.proxies = proxies - super().__init__(url, disable_ssl_verification=disable_ssl_verification, - token_name=self.TOKEN_NAME) + super().__init__(url, disable_ssl_verification=disable_ssl_verification, token_name=self.TOKEN_NAME) def request_token(self) -> dict: - """Makes a request for a token. - """ + """Makes a request for a token.""" response = self._request( method='POST', headers=self.headers, url=self.url, - data=json.dumps({ - "username": self.username, - "password": self.password, - "api_key": self.apikey - }), + data=json.dumps({"username": self.username, "password": self.password, "api_key": self.apikey}), proxies=self.proxies, - verify=self.verify) + verify=self.verify, + ) return response def set_headers(self, headers: Dict[str, str]) -> None: diff --git a/ibm_cloud_sdk_core/token_managers/iam_request_based_token_manager.py b/ibm_cloud_sdk_core/token_managers/iam_request_based_token_manager.py index 2d095b2..a763412 100644 --- a/ibm_cloud_sdk_core/token_managers/iam_request_based_token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/iam_request_based_token_manager.py @@ -19,7 +19,7 @@ from .jwt_token_manager import JWTTokenManager -#pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-instance-attributes class IAMRequestBasedTokenManager(JWTTokenManager): """The IamRequestBasedTokenManager class contains code relevant to any token manager that interacts with the IAM service to manage a token. It stores information relevant to all @@ -60,21 +60,24 @@ class IAMRequestBasedTokenManager(JWTTokenManager): scope: The "scope" to use when fetching the bearer token from the IAM token server. This can be used to obtain an access token with a specific scope. """ + DEFAULT_IAM_URL = 'https://iam.cloud.ibm.com' OPERATION_PATH = "/identity/token" - def __init__(self, - url: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, - disable_ssl_verification: bool = False, - headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None, - scope: Optional[str] = None) -> None: + def __init__( + self, + url: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + scope: Optional[str] = None, + ) -> None: if not url: url = self.DEFAULT_IAM_URL if url.endswith(self.OPERATION_PATH): - url = url[:-len(self.OPERATION_PATH)] + url = url[: -len(self.OPERATION_PATH)] self.url = url self.client_id = client_id self.client_secret = client_secret @@ -83,8 +86,7 @@ def __init__(self, self.proxies = proxies self.scope = scope self.request_payload = {} - super().__init__( - self.url, disable_ssl_verification=disable_ssl_verification, token_name='access_token') + super().__init__(self.url, disable_ssl_verification=disable_ssl_verification, token_name='access_token') def request_token(self) -> dict: """Request an IAM OAuth token given an API Key. @@ -95,10 +97,7 @@ def request_token(self) -> dict: Returns: A dictionary containing the bearer token to be subsequently used service requests. """ - headers = { - 'Content-type': 'application/x-www-form-urlencoded', - 'Accept': 'application/json' - } + headers = {'Content-type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'} if self.headers is not None and isinstance(self.headers, dict): headers.update(self.headers) @@ -118,7 +117,8 @@ def request_token(self) -> dict: headers=headers, data=data, auth_tuple=auth_tuple, - proxies=self.proxies) + proxies=self.proxies, + ) return response def set_client_id_and_secret(self, client_id: str, client_secret: str) -> None: diff --git a/ibm_cloud_sdk_core/token_managers/iam_token_manager.py b/ibm_cloud_sdk_core/token_managers/iam_token_manager.py index e820c6a..8c532cb 100644 --- a/ibm_cloud_sdk_core/token_managers/iam_token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/iam_token_manager.py @@ -60,19 +60,27 @@ class IAMTokenManager(IAMRequestBasedTokenManager): This can be used to obtain an access token with a specific scope. """ - def __init__(self, - apikey: str, - *, - url: Optional[str] = None, - client_id: Optional[str] = None, - client_secret: Optional[str] = None, - disable_ssl_verification: bool = False, - headers: Optional[Dict[str, str]] = None, - proxies: Optional[Dict[str, str]] = None, - scope: Optional[str] = None) -> None: + def __init__( + self, + apikey: str, + *, + url: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + scope: Optional[str] = None + ) -> None: super().__init__( - url=url, client_id=client_id, client_secret=client_secret, - disable_ssl_verification=disable_ssl_verification, headers=headers, proxies=proxies, scope=scope) + url=url, + client_id=client_id, + client_secret=client_secret, + disable_ssl_verification=disable_ssl_verification, + headers=headers, + proxies=proxies, + scope=scope, + ) self.apikey = apikey diff --git a/ibm_cloud_sdk_core/token_managers/jwt_token_manager.py b/ibm_cloud_sdk_core/token_managers/jwt_token_manager.py index 63cd59a..497fa67 100644 --- a/ibm_cloud_sdk_core/token_managers/jwt_token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/jwt_token_manager.py @@ -75,15 +75,7 @@ def _save_token_info(self, token_response: dict) -> None: buffer = (exp - iat) * 0.2 self.refresh_time = self.expire_time - buffer - def _request(self, - method, - url, - *, - headers=None, - params=None, - data=None, - auth_tuple=None, - **kwargs) -> dict: + def _request(self, method, url, *, headers=None, params=None, data=None, auth_tuple=None, **kwargs) -> dict: kwargs = dict({"timeout": 60}, **kwargs) kwargs = dict(kwargs, **self.http_config) @@ -91,13 +83,8 @@ def _request(self, kwargs['verify'] = False response = requests.request( - method=method, - url=url, - headers=headers, - params=params, - data=data, - auth=auth_tuple, - **kwargs) + method=method, url=url, headers=headers, params=params, data=data, auth=auth_tuple, **kwargs + ) if 200 <= response.status_code <= 299: return response.json() diff --git a/ibm_cloud_sdk_core/token_managers/token_manager.py b/ibm_cloud_sdk_core/token_managers/token_manager.py index 493c228..41d09f3 100644 --- a/ibm_cloud_sdk_core/token_managers/token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/token_manager.py @@ -51,12 +51,7 @@ class TokenManager(ABC): access_token (str): The latest stored access token """ - def __init__( - self, - url: str, - *, - disable_ssl_verification: bool = False - ): + def __init__(self, url: str, *, disable_ssl_verification: bool = False): self.url = url self.disable_ssl_verification = disable_ssl_verification self.expire_time = 0 @@ -190,15 +185,7 @@ def _save_token_info(self, token_response: dict) -> None: # pragma: no cover """ pass - def _request(self, - method, - url, - *, - headers=None, - params=None, - data=None, - auth_tuple=None, - **kwargs): + def _request(self, method, url, *, headers=None, params=None, data=None, auth_tuple=None, **kwargs): kwargs = dict({"timeout": 60}, **kwargs) kwargs = dict(kwargs, **self.http_config) @@ -206,13 +193,8 @@ def _request(self, kwargs['verify'] = False response = requests.request( - method=method, - url=url, - headers=headers, - params=params, - data=data, - auth=auth_tuple, - **kwargs) + method=method, url=url, headers=headers, params=params, data=data, auth=auth_tuple, **kwargs + ) if 200 <= response.status_code <= 299: return response diff --git a/ibm_cloud_sdk_core/token_managers/vpc_instance_token_manager.py b/ibm_cloud_sdk_core/token_managers/vpc_instance_token_manager.py index 865e500..44cdd51 100644 --- a/ibm_cloud_sdk_core/token_managers/vpc_instance_token_manager.py +++ b/ibm_cloud_sdk_core/token_managers/vpc_instance_token_manager.py @@ -56,10 +56,9 @@ class VPCInstanceTokenManager(JWTTokenManager): DEFAULT_IMS_ENDPOINT = 'http://169.254.169.254' TOKEN_NAME = 'access_token' - def __init__(self, - iam_profile_crn: Optional[str] = None, - iam_profile_id: Optional[str] = None, - url: Optional[str] = None) -> None: + def __init__( + self, iam_profile_crn: Optional[str] = None, iam_profile_id: Optional[str] = None, url: Optional[str] = None + ) -> None: if not url: url = self.DEFAULT_IMS_ENDPOINT @@ -91,17 +90,17 @@ def request_token(self) -> dict: headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', - 'Authorization': 'Bearer ' + instance_identity_token + 'Authorization': 'Bearer ' + instance_identity_token, } - logger.debug( - 'Invoking VPC \'create_iam_token\' operation: %s', url) + logger.debug('Invoking VPC \'create_iam_token\' operation: %s', url) response = self._request( method='POST', url=url, headers=headers, params={'version': self.METADATA_SERVICE_VERSION}, - data=json.dumps(request_payload) if request_payload else None) + data=json.dumps(request_payload) if request_payload else None, + ) logger.debug('Returned from VPC \'create_iam_token\' operation."') return response @@ -142,14 +141,14 @@ def retrieve_instance_identity_token(self) -> str: request_body = {'expires_in': 300} - logger.debug( - 'Invoking VPC \'create_access_token\' operation: %s', url) + logger.debug('Invoking VPC \'create_access_token\' operation: %s', url) response = self._request( method='PUT', url=url, headers=headers, params={'version': self.METADATA_SERVICE_VERSION}, - data=json.dumps(request_body)) + data=json.dumps(request_body), + ) logger.debug('Returned from VPC \'create_access_token\' operation."') return response['access_token'] diff --git a/ibm_cloud_sdk_core/utils.py b/ibm_cloud_sdk_core/utils.py index ef6243e..7dcbfeb 100644 --- a/ibm_cloud_sdk_core/utils.py +++ b/ibm_cloud_sdk_core/utils.py @@ -29,15 +29,14 @@ class SSLHTTPAdapter(HTTPAdapter): - """Wraps the original HTTP adapter and adds additional SSL context. - """ + """Wraps the original HTTP adapter and adds additional SSL context.""" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - #pylint: disable=arguments-differ + # pylint: disable=arguments-differ def init_poolmanager(self, connections, maxsize, block): - """Extends the parent's method by adding minimum SSL version to the args. - """ + """Extends the parent's method by adding minimum SSL version to the args.""" ssl_context = create_urllib3_context() ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2 super().init_poolmanager(connections, maxsize, block, ssl_context=ssl_context) @@ -52,8 +51,7 @@ def has_bad_first_or_last_char(val: str) -> bool: Returns: Whether or not the string starts or ends with bad characters. """ - return val is not None and (val.startswith('{') or val.startswith('"') - or val.endswith('}') or val.endswith('"')) + return val is not None and (val.startswith('{') or val.startswith('"') or val.endswith('}') or val.endswith('"')) def remove_null_values(dictionary: dict) -> dict: @@ -146,8 +144,9 @@ def string_to_datetime_list(string_list: List[str]) -> List[datetime.datetime]: the de-serialized list of strings as a list of datetime objects. """ if not isinstance(string_list, list): - raise ValueError("Invalid argument type: " + str(type(string_list)) - + ". Argument string_list must be of type List[str]") + raise ValueError( + "Invalid argument type: " + str(type(string_list)) + ". Argument string_list must be of type List[str]" + ) datetime_list = [] for string_val in string_list: datetime_list.append(string_to_datetime(string_val)) @@ -164,8 +163,11 @@ def datetime_to_string_list(datetime_list: List[datetime.datetime]) -> List[str] list of datetimes serialized as strings in iso8601 format. """ if not isinstance(datetime_list, list): - raise ValueError("Invalid argument type: " + str(type(datetime_list)) - + ". Argument datetime_list must be of type List[datetime.datetime]") + raise ValueError( + "Invalid argument type: " + + str(type(datetime_list)) + + ". Argument datetime_list must be of type List[datetime.datetime]" + ) string_list = [] for datetime_val in datetime_list: string_list.append(datetime_to_string(datetime_val)) @@ -299,9 +301,7 @@ def __read_from_env_variables(service_name: str) -> dict: return config -def __read_from_credential_file(service_name: str, - *, - separator: str = '=') -> dict: +def __read_from_credential_file(service_name: str, *, separator: str = '=') -> dict: """Return a config object based on credentials file for a service. Args: @@ -339,8 +339,7 @@ def __read_from_credential_file(service_name: str, if len(key_val) == 2: key = key_val[0] value = key_val[1] - _parse_key_and_update_config(config, service_name, key, - value) + _parse_key_and_update_config(config, service_name, key, value) except OSError: # just absorb the exception and make sure we return an empty response config = {} @@ -348,11 +347,10 @@ def __read_from_credential_file(service_name: str, return config -def _parse_key_and_update_config(config: dict, service_name: str, key: str, - value: str) -> None: +def _parse_key_and_update_config(config: dict, service_name: str, key: str, value: str) -> None: service_name = service_name.replace(' ', '_').replace('-', '_').upper() if key.startswith(service_name): - config[key[len(service_name) + 1:]] = value + config[key[len(service_name) + 1 :]] = value def __read_from_vcap_services(service_name: str) -> dict: @@ -370,39 +368,30 @@ def __read_from_vcap_services(service_name: str) -> dict: services = json_import.loads(vcap_services) for key in services.keys(): for i in range(len(services[key])): - if vcap_service_credentials and isinstance( - vcap_service_credentials, dict): + if vcap_service_credentials and isinstance(vcap_service_credentials, dict): break if services[key][i].get('name') == service_name: - vcap_service_credentials = services[key][i].get( - 'credentials', {}) + vcap_service_credentials = services[key][i].get('credentials', {}) if not vcap_service_credentials: if service_name in services.keys(): service = services.get(service_name) if service: - vcap_service_credentials = service[0].get( - 'credentials', {}) + vcap_service_credentials = service[0].get('credentials', {}) - if vcap_service_credentials and isinstance(vcap_service_credentials, - dict): + if vcap_service_credentials and isinstance(vcap_service_credentials, dict): new_vcap_creds = {} # cf - if vcap_service_credentials.get( - 'username') and vcap_service_credentials.get('password'): + if vcap_service_credentials.get('username') and vcap_service_credentials.get('password'): new_vcap_creds['AUTH_TYPE'] = 'basic' - new_vcap_creds['USERNAME'] = vcap_service_credentials.get( - 'username') - new_vcap_creds['PASSWORD'] = vcap_service_credentials.get( - 'password') + new_vcap_creds['USERNAME'] = vcap_service_credentials.get('username') + new_vcap_creds['PASSWORD'] = vcap_service_credentials.get('password') vcap_service_credentials = new_vcap_creds elif vcap_service_credentials.get('iam_apikey'): new_vcap_creds['AUTH_TYPE'] = 'iam' - new_vcap_creds['APIKEY'] = vcap_service_credentials.get( - 'iam_apikey') + new_vcap_creds['APIKEY'] = vcap_service_credentials.get('iam_apikey') vcap_service_credentials = new_vcap_creds elif vcap_service_credentials.get('apikey'): new_vcap_creds['AUTH_TYPE'] = 'iam' - new_vcap_creds['APIKEY'] = vcap_service_credentials.get( - 'apikey') + new_vcap_creds['APIKEY'] = vcap_service_credentials.get('apikey') vcap_service_credentials = new_vcap_creds return vcap_service_credentials diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6ae249d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.black] +line-length = 120 +skip-string-normalization = true diff --git a/requirements-dev.txt b/requirements-dev.txt index 210b76e..19c4c08 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,3 +4,4 @@ pytest>=7.0.1,<8.0.0 pytest-cov>=2.2.1,<3.0.0 responses>=0.12.1,<1.0.0 tox>=3.2.0,<4.0.0 +black>=22.10 diff --git a/setup.py b/setup.py index a8424cb..b0f6e0d 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ with open('requirements-dev.txt') as f: tests_require = [str(req) for req in pkg_resources.parse_requirements(f)] + class PyTest(TestCommand): def finalize_options(self): TestCommand.finalize_options(self) @@ -44,40 +45,43 @@ def finalize_options(self): def run_tests(self): import pytest + errcode = pytest.main(self.test_args) sys.exit(errcode) + with open("README.md", "r") as fh: readme = fh.read() -setup(name='ibm-cloud-sdk-core', - version=__version__, - description='Core library used by SDKs for IBM Cloud Services', - license='Apache 2.0', - install_requires=install_requires, - tests_require=tests_require, - cmdclass={'test': PyTest}, - author='IBM', - author_email='devxsdk@us.ibm.com', - long_description=readme, - long_description_content_type='text/markdown', - url='https://github.com/IBM/python-sdk-core', - packages=find_packages(), - include_package_data=True, - keywords='watson, ibm, cloud, ibm cloud services', - classifiers=[ - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: OS Independent', - 'Topic :: Software Development :: Libraries :: Python Modules', - 'Topic :: Software Development :: Libraries :: Application Frameworks', - ], - zip_safe=True - ) +setup( + name='ibm-cloud-sdk-core', + version=__version__, + description='Core library used by SDKs for IBM Cloud Services', + license='Apache 2.0', + install_requires=install_requires, + tests_require=tests_require, + cmdclass={'test': PyTest}, + author='IBM', + author_email='devxsdk@us.ibm.com', + long_description=readme, + long_description_content_type='text/markdown', + url='https://github.com/IBM/python-sdk-core', + packages=find_packages(), + include_package_data=True, + keywords='watson, ibm, cloud, ibm cloud services', + classifiers=[ + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Development Status :: 5 - Production/Stable', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: OS Independent', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'Topic :: Software Development :: Libraries :: Application Frameworks', + ], + zip_safe=True, +) diff --git a/test/test_api_exception.py b/test/test_api_exception.py index 277346d..9b130fa 100644 --- a/test/test_api_exception.py +++ b/test/test_api_exception.py @@ -10,64 +10,79 @@ @responses.activate def test_api_exception(): """Test APIException class""" - responses.add(responses.GET, - 'https://test.com', - status=500, - body=json.dumps({'error': 'sorry', 'msg': 'serious error'}), - content_type='application/json') + responses.add( + responses.GET, + 'https://test.com', + status=500, + body=json.dumps({'error': 'sorry', 'msg': 'serious error'}), + content_type='application/json', + ) - mock_response = requests.get('https://test.com') + mock_response = requests.get('https://test.com', timeout=None) exception = ApiException(500, http_response=mock_response) assert exception is not None assert exception.message == 'sorry' - responses.add(responses.GET, - 'https://test-again.com', - status=500, - body=json.dumps({ - "errors": [ - { - "message": "sorry again", - }], - }), - content_type='application/json') - mock_response = requests.get('https://test-again.com') + responses.add( + responses.GET, + 'https://test-again.com', + status=500, + body=json.dumps( + { + "errors": [ + { + "message": "sorry again", + } + ], + } + ), + content_type='application/json', + ) + mock_response = requests.get('https://test-again.com', timeout=None) exception = ApiException(500, http_response=mock_response) assert exception.message == 'sorry again' - responses.add(responses.GET, - 'https://test-once-more.com', - status=500, - body=json.dumps({'message': 'sorry once more'}), - content_type='application/json') - mock_response = requests.get('https://test-once-more.com') + responses.add( + responses.GET, + 'https://test-once-more.com', + status=500, + body=json.dumps({'message': 'sorry once more'}), + content_type='application/json', + ) + mock_response = requests.get('https://test-once-more.com', timeout=None) exception = ApiException(500, http_response=mock_response) assert exception.message == 'sorry once more' - responses.add(responses.GET, - 'https://test-msg.com', - status=500, - body=json.dumps({'msg': 'serious error'}), - content_type='application/json') - mock_response = requests.get('https://test-msg.com') + responses.add( + responses.GET, + 'https://test-msg.com', + status=500, + body=json.dumps({'msg': 'serious error'}), + content_type='application/json', + ) + mock_response = requests.get('https://test-msg.com', timeout=None) exception = ApiException(500, http_response=mock_response) assert exception.message == 'Internal Server Error' - responses.add(responses.GET, - 'https://test-errormessage.com', - status=500, - body=json.dumps({'errorMessage': 'IAM error message'}), - content_type='application/json') - mock_response = requests.get('https://test-errormessage.com') + responses.add( + responses.GET, + 'https://test-errormessage.com', + status=500, + body=json.dumps({'errorMessage': 'IAM error message'}), + content_type='application/json', + ) + mock_response = requests.get('https://test-errormessage.com', timeout=None) exception = ApiException(500, http_response=mock_response) assert exception.message == 'IAM error message' - responses.add(responses.GET, - 'https://test-for-text.com', - status=500, - headers={'X-Global-Transaction-ID': 'xx'}, - body="plain text error") - mock_response = requests.get('https://test-for-text.com') + responses.add( + responses.GET, + 'https://test-for-text.com', + status=500, + headers={'X-Global-Transaction-ID': 'xx'}, + body="plain text error", + ) + mock_response = requests.get('https://test-for-text.com', timeout=None) exception = ApiException(500, http_response=mock_response) assert exception.message == 'plain text error' assert str(exception) == 'Error: plain text error, Code: 500 , X-global-transaction-id: xx' diff --git a/test/test_base_service.py b/test/test_base_service.py index e9bb7ba..bf456f0 100644 --- a/test/test_base_service.py +++ b/test/test_base_service.py @@ -19,24 +19,23 @@ from ibm_cloud_sdk_core import BaseService, DetailedResponse from ibm_cloud_sdk_core import CP4DTokenManager from ibm_cloud_sdk_core import get_authenticator_from_environment -from ibm_cloud_sdk_core.authenticators import (IAMAuthenticator, NoAuthAuthenticator, Authenticator, - BasicAuthenticator, CloudPakForDataAuthenticator) +from ibm_cloud_sdk_core.authenticators import ( + IAMAuthenticator, + NoAuthAuthenticator, + Authenticator, + BasicAuthenticator, + CloudPakForDataAuthenticator, +) class IncludeExternalConfigService(BaseService): default_service_url = 'https://servicesthatincludeexternalconfig.com/api' def __init__( - self, - api_version: str, - authenticator: Optional[Authenticator] = None, - trace_id: Optional[str] = None + self, api_version: str, authenticator: Optional[Authenticator] = None, trace_id: Optional[str] = None ) -> None: BaseService.__init__( - self, - service_url=self.default_service_url, - authenticator=authenticator, - disable_ssl_verification=False + self, service_url=self.default_service_url, authenticator=authenticator, disable_ssl_verification=False ) self.api_version = api_version self.trace_id = trace_id @@ -51,13 +50,14 @@ def __init__( version: str, service_url: str = default_url, authenticator: Optional[Authenticator] = None, - disable_ssl_verification: bool = False + disable_ssl_verification: bool = False, ) -> None: BaseService.__init__( self, service_url=service_url, authenticator=authenticator, - disable_ssl_verification=disable_ssl_verification) + disable_ssl_verification=disable_ssl_verification, + ) self.version = version def op_with_path_params(self, path0: str, path1: str) -> DetailedResponse: @@ -66,8 +66,7 @@ def op_with_path_params(self, path0: str, path1: str) -> DetailedResponse: if path1 is None: raise ValueError('path1 must be provided') params = {'version': self.version} - url = '/v1/foo/{0}/bar/{1}/baz'.format( - *self._encode_path_vars(path0, path1)) + url = '/v1/foo/{0}/bar/{1}/baz'.format(*self._encode_path_vars(path0, path1)) request = self.prepare_request(method='GET', url=url, params=params) response = self.send(request) return response @@ -106,16 +105,12 @@ def get_access_token() -> str: "aud": "sss", "uid": "sss", "iat": 3600, - "exp": int(time.time()) + "exp": int(time.time()), } access_token = jwt.encode( - access_token_layout, - 'secret', - algorithm='HS256', - headers={ - 'kid': '230498151c214b788dd97f22b85410a5' - }) + access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) return access_token @@ -141,13 +136,8 @@ def test_url_encoding(): test_url = service.default_url + path_encoded responses.add( - responses.GET, - test_url, - status=200, - body=json.dumps({ - "foobar": "baz" - }), - content_type='application/json') + responses.GET, test_url, status=200, body=json.dumps({"foobar": "baz"}), content_type='application/json' + ) # Set Host as a default header on the service. service.set_default_headers({'Host': 'alternatehost.ibm.com:443'}) @@ -160,8 +150,7 @@ def test_url_encoding(): assert 'version=2017-07-07' in responses.calls[0].request.url # Verify that the Host header was set in the request. - assert responses.calls[0].request.headers.get( - 'Host') == 'alternatehost.ibm.com:443' + assert responses.calls[0].request.headers.get('Host') == 'alternatehost.ibm.com:443' @responses.activate @@ -171,18 +160,12 @@ def test_stream_json_response(): path = '/v1/streamjson' test_url = service.default_url + path - expected_response = json.dumps( - {"id": 1, "rev": "v1", "content": "this is a document"}) + expected_response = json.dumps({"id": 1, "rev": "v1", "content": "this is a document"}) # print("Expected response: ", expected_response) # Simulate a JSON response - responses.add( - responses.GET, - test_url, - status=200, - body=expected_response, - content_type='application/json') + responses.add(responses.GET, test_url, status=200, body=expected_response, content_type='application/json') # Invoke the operation and receive an "iterable" as the response response = service.get_document_as_stream() @@ -211,10 +194,9 @@ def test_http_config(): responses.GET, service.default_url, status=200, - body=json.dumps({ - "foobar": "baz" - }), - content_type='application/json') + body=json.dumps({"foobar": "baz"}), + content_type='application/json', + ) response = service.with_http_config({'timeout': 100}) assert response is not None @@ -229,8 +211,7 @@ def test_fail_http_config(): @responses.activate def test_cwd(): - file_path = os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') # Try changing working directories to test getting creds from cwd cwd = os.getcwd() os.chdir(os.path.dirname(file_path)) @@ -254,8 +235,7 @@ def test_cwd(): @responses.activate def test_iam(): - file_path = os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-iam.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path iam_authenticator = get_authenticator_from_environment('ibm-watson') service = AnyServiceV1('2017-07-07', authenticator=iam_authenticator) @@ -268,20 +248,15 @@ def test_iam(): "token_type": "Bearer", "expires_in": 3600, "expiration": int(time.time()), - "refresh_token": "jy4gl91BQ" + "refresh_token": "jy4gl91BQ", } - responses.add( - responses.POST, - url='https://iam.cloud.ibm.com/identity/token', - body=json.dumps(response), - status=200) + responses.add(responses.POST, url='https://iam.cloud.ibm.com/identity/token', body=json.dumps(response), status=200) responses.add( responses.GET, url='https://gateway.watsonplatform.net/test/api', - body=json.dumps({ - "foobar": "baz" - }), - content_type='application/json') + body=json.dumps({"foobar": "baz"}), + content_type='application/json', + ) service.any_service_call() assert "grant-type%3Aapikey" in responses.calls[0].request.body @@ -309,8 +284,7 @@ def __init__(self): def test_for_cp4d(): - cp4d_authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', - 'my_url') + cp4d_authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', 'my_url') service = AnyServiceV1('2017-07-07', authenticator=cp4d_authenticator) assert service.authenticator.token_manager is not None assert service.authenticator.token_manager.username == 'my_username' @@ -320,15 +294,13 @@ def test_for_cp4d(): def test_disable_ssl_verification(): - service1 = AnyServiceV1( - '2017-07-07', authenticator=NoAuthAuthenticator(), disable_ssl_verification=True) + service1 = AnyServiceV1('2017-07-07', authenticator=NoAuthAuthenticator(), disable_ssl_verification=True) assert service1.disable_ssl_verification is True service1.set_disable_ssl_verification(False) assert service1.disable_ssl_verification is False - cp4d_authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', - 'my_url') + cp4d_authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', 'my_url') service2 = AnyServiceV1('2017-07-07', authenticator=cp4d_authenticator) assert service2.disable_ssl_verification is False cp4d_authenticator.set_disable_ssl_verification(True) @@ -339,12 +311,7 @@ def test_disable_ssl_verification(): def test_http_head(): service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) expected_headers = {'Test-Header1': 'value1', 'Test-Header2': 'value2'} - responses.add( - responses.HEAD, - service.default_url, - status=200, - headers=expected_headers, - content_type=None) + responses.add(responses.HEAD, service.default_url, status=200, headers=expected_headers, content_type=None) response = service.head_request() assert response is not None @@ -366,16 +333,14 @@ def test_response_with_no_body(): def test_has_bad_first_or_last_char(): with pytest.raises(ValueError) as err: - basic_authenticator = BasicAuthenticator( - '{my_username}', 'my_password') + basic_authenticator = BasicAuthenticator('{my_username}', 'my_password') AnyServiceV1('2018-11-20', authenticator=basic_authenticator).prepare_request( - responses.GET, - 'https://gateway.watsonplatform.net/test/api' + responses.GET, 'https://gateway.watsonplatform.net/test/api' ) - assert str( - err.value - ) == 'The username and password shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) @responses.activate @@ -384,10 +349,9 @@ def test_request_server_error(): responses.GET, 'https://gateway.watsonplatform.net/test/api', status=500, - body=json.dumps({ - 'error': 'internal server error' - }), - content_type='application/json') + body=json.dumps({'error': 'internal server error'}), + content_type='application/json', + ) service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) try: prepped = service.prepare_request('GET', url='') @@ -402,17 +366,15 @@ def test_request_success_json(): responses.GET, 'https://gateway.watsonplatform.net/test/api', status=200, - body=json.dumps({ - 'foo': 'bar' - }), - content_type='application/json') + body=json.dumps({'foo': 'bar'}), + content_type='application/json', + ) service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) prepped = service.prepare_request('GET', url='') detailed_response = service.send(prepped) assert detailed_response.get_result() == {'foo': 'bar'} - service = AnyServiceV1( - '2018-11-20', authenticator=BasicAuthenticator('my_username', 'my_password')) + service = AnyServiceV1('2018-11-20', authenticator=BasicAuthenticator('my_username', 'my_password')) service.set_default_headers({'test': 'header'}) service.set_disable_ssl_verification(True) prepped = service.prepare_request('GET', url='') @@ -426,10 +388,9 @@ def test_request_success_response(): responses.GET, 'https://gateway.watsonplatform.net/test/api', status=200, - body=json.dumps({ - 'foo': 'bar' - }), - content_type='application/json') + body=json.dumps({'foo': 'bar'}), + content_type='application/json', + ) service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) prepped = service.prepare_request('GET', url='') detailed_response = service.send(prepped) @@ -442,10 +403,9 @@ def test_request_fail_401(): responses.GET, 'https://gateway.watsonplatform.net/test/api', status=401, - body=json.dumps({ - 'foo': 'bar' - }), - content_type='application/json') + body=json.dumps({'foo': 'bar'}), + content_type='application/json', + ) service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) try: prepped = service.prepare_request('GET', url='') @@ -455,9 +415,7 @@ def test_request_fail_401(): def test_misc_methods(): - class MockModel: - def __init__(self, xyz=None): self.xyz = xyz @@ -504,8 +462,10 @@ def test_set_service_url(): service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) with pytest.raises(ValueError) as err: service.set_service_url('{url}') - assert str(err.value) == 'The service url shouldn\'t start or end with curly brackets or quotes. '\ - 'Be sure to remove any {} and \" characters surrounding your service url' + assert ( + str(err.value) == 'The service url shouldn\'t start or end with curly brackets or quotes. ' + 'Be sure to remove any {} and \" characters surrounding your service url' + ) service.set_service_url('my_url') @@ -514,8 +474,7 @@ def test_http_client(): auth = BasicAuthenticator('my_username', 'my_password') service = AnyServiceV1('2018-11-20', authenticator=auth) assert isinstance(service.get_http_client(), requests.sessions.Session) - assert service.get_http_client().headers.get( - 'Accept-Encoding') == 'gzip, deflate' + assert service.get_http_client().headers.get('Accept-Encoding') == 'gzip, deflate' new_http_client = requests.Session() new_http_client.headers.update({'Accept-Encoding': 'gzip'}) @@ -536,16 +495,14 @@ def test_gzip_compression(): # Should return uncompressed data when gzip is off service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) assert not service.get_enable_gzip_compression() - prepped = service.prepare_request( - 'GET', url='', data=json.dumps({"foo": "bar"})) + prepped = service.prepare_request('GET', url='', data=json.dumps({"foo": "bar"})) assert prepped['data'] == b'{"foo": "bar"}' assert prepped['headers'].get('content-encoding') != 'gzip' # Should return compressed data when gzip is on service.set_enable_gzip_compression(True) assert service.get_enable_gzip_compression() - prepped = service.prepare_request( - 'GET', url='', data=json.dumps({"foo": "bar"})) + prepped = service.prepare_request('GET', url='', data=json.dumps({"foo": "bar"})) assert prepped['data'] == gzip.compress(b'{"foo": "bar"}') assert prepped['headers'].get('content-encoding') == 'gzip' @@ -579,30 +536,27 @@ def test_gzip_compression(): # Should return uncompressed data when content-encoding is set assert service.get_enable_gzip_compression() - prepped = service.prepare_request('GET', url='', headers={"content-encoding": "gzip"}, - data=json.dumps({"foo": "bar"})) + prepped = service.prepare_request( + 'GET', url='', headers={"content-encoding": "gzip"}, data=json.dumps({"foo": "bar"}) + ) assert prepped['data'] == b'{"foo": "bar"}' assert prepped['headers'].get('content-encoding') == 'gzip' def test_gzip_compression_external(): # Should set gzip compression from external config - file_path = os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-gzip.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-gzip.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path - service = IncludeExternalConfigService( - 'v1', authenticator=NoAuthAuthenticator()) + service = IncludeExternalConfigService('v1', authenticator=NoAuthAuthenticator()) assert service.service_url == 'https://mockurl' assert service.get_enable_gzip_compression() is True - prepped = service.prepare_request( - 'GET', url='', data=json.dumps({"foo": "bar"})) + prepped = service.prepare_request('GET', url='', data=json.dumps({"foo": "bar"})) assert prepped['data'] == gzip.compress(b'{"foo": "bar"}') assert prepped['headers'].get('content-encoding') == 'gzip' def test_retry_config_default(): - service = BaseService(service_url='https://mockurl/', - authenticator=NoAuthAuthenticator()) + service = BaseService(service_url='https://mockurl/', authenticator=NoAuthAuthenticator()) service.enable_retries() assert service.retry_config.total == 4 assert service.retry_config.backoff_factor == 1.0 @@ -622,8 +576,7 @@ def test_retry_config_default(): def test_retry_config_disable(): # Test disabling retries - service = BaseService(service_url='https://mockurl/', - authenticator=NoAuthAuthenticator()) + service = BaseService(service_url='https://mockurl/', authenticator=NoAuthAuthenticator()) service.enable_retries() service.disable_retries() assert service.retry_config is None @@ -638,8 +591,7 @@ def test_retry_config_disable(): def test_retry_config_non_default(): - service = BaseService(service_url='https://mockurl/', - authenticator=NoAuthAuthenticator()) + service = BaseService(service_url='https://mockurl/', authenticator=NoAuthAuthenticator()) service.enable_retries(2, 0.3) assert service.retry_config.total == 2 assert service.retry_config.backoff_factor == 0.3 @@ -655,11 +607,9 @@ def test_retry_config_non_default(): def test_retry_config_external(): - file_path = os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-retry.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-retry.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path - service = IncludeExternalConfigService( - 'v1', authenticator=NoAuthAuthenticator()) + service = IncludeExternalConfigService('v1', authenticator=NoAuthAuthenticator()) assert service.retry_config.total == 3 assert service.retry_config.backoff_factor == 0.2 @@ -681,40 +631,24 @@ def test_user_agent_header(): assert user_agent_header is not None assert user_agent_header['User-Agent'] is not None - responses.add( - responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=200, - body='some text') - prepped = service.prepare_request('GET', url='', headers={ - 'user-agent': 'my_user_agent' - }) + responses.add(responses.GET, 'https://gateway.watsonplatform.net/test/api', status=200, body='some text') + prepped = service.prepare_request('GET', url='', headers={'user-agent': 'my_user_agent'}) response = service.send(prepped) - assert response.get_result().request.headers.get( - 'user-agent') == 'my_user_agent' + assert response.get_result().request.headers.get('user-agent') == 'my_user_agent' prepped = service.prepare_request('GET', url='', headers=None) response = service.send(prepped) - assert response.get_result().request.headers.get( - 'user-agent') == user_agent_header['User-Agent'] + assert response.get_result().request.headers.get('user-agent') == user_agent_header['User-Agent'] @responses.activate def test_reserved_keys(caplog): service = AnyServiceV1('2021-07-02', authenticator=NoAuthAuthenticator()) - responses.add( - responses.GET, - 'https://gateway.watsonplatform.net/test/api', - status=200, - body='some text') + responses.add(responses.GET, 'https://gateway.watsonplatform.net/test/api', status=200, body='some text') prepped = service.prepare_request('GET', url='', headers={'key': 'OK'}) response = service.send( - prepped, - headers={'key': 'bad'}, - method='POST', - url='localhost', - cookies=None, - hooks={'response': []}) + prepped, headers={'key': 'bad'}, method='POST', url='localhost', cookies=None, hooks={'response': []} + ) assert response.get_result().request.headers.get('key') == 'OK' assert response.get_result().request.url == 'https://gateway.watsonplatform.net/test/api' assert response.get_result().request.method == 'GET' @@ -726,10 +660,7 @@ def test_reserved_keys(caplog): @responses.activate def test_ssl_error(): - responses.add( - responses.GET, - 'https://gateway.watsonplatform.net/test/api', - body=requests.exceptions.SSLError()) + responses.add(responses.GET, 'https://gateway.watsonplatform.net/test/api', body=requests.exceptions.SSLError()) service = AnyServiceV1('2021-08-18', authenticator=NoAuthAuthenticator()) with pytest.raises(requests.exceptions.SSLError): prepped = service.prepare_request('GET', url='') @@ -741,12 +672,11 @@ def test_files_dict(): form_data = {} with open( - os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r', encoding='utf-8') as file: + os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r', encoding='utf-8' + ) as file: form_data['file1'] = (None, file, 'application/octet-stream') form_data['string1'] = (None, 'hello', 'text/plain') - request = service.prepare_request( - 'GET', url='', headers={'X-opt-out': True}, files=form_data) + request = service.prepare_request('GET', url='', headers={'X-opt-out': True}, files=form_data) files = request['files'] assert isinstance(files, list) assert len(files) == 2 @@ -762,12 +692,11 @@ def test_files_list(): form_data = [] with open( - os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r', encoding='utf-8') as file: + os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r', encoding='utf-8' + ) as file: form_data.append(('file1', (None, file, 'application/octet-stream'))) form_data.append(('string1', (None, 'hello', 'text/plain'))) - request = service.prepare_request( - 'GET', url='', headers={'X-opt-out': True}, files=form_data) + request = service.prepare_request('GET', url='', headers={'X-opt-out': True}, files=form_data) files = request['files'] assert isinstance(files, list) assert len(files) == 2 @@ -783,22 +712,18 @@ def test_files_duplicate_parts(): form_data = [] with open( - os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r', encoding='utf-8') as file: - form_data.append( - ('creds_file', (None, file, 'application/octet-stream'))) + os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env'), 'r', encoding='utf-8' + ) as file: + form_data.append(('creds_file', (None, file, 'application/octet-stream'))) with open( - os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-basic.env'), 'r', encoding='utf-8') as file: - form_data.append( - ('creds_file', (None, file, 'application/octet-stream'))) + os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-basic.env'), 'r', encoding='utf-8' + ) as file: + form_data.append(('creds_file', (None, file, 'application/octet-stream'))) with open( - os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-bearer.env'), 'r', encoding='utf-8') as file: - form_data.append( - ('creds_file', (None, file, 'application/octet-stream'))) - request = service.prepare_request( - 'GET', url='', headers={'X-opt-out': True}, files=form_data) + os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-bearer.env'), 'r', encoding='utf-8' + ) as file: + form_data.append(('creds_file', (None, file, 'application/octet-stream'))) + request = service.prepare_request('GET', url='', headers={'X-opt-out': True}, files=form_data) files = request['files'] assert isinstance(files, list) assert len(files) == 3 @@ -809,72 +734,51 @@ def test_files_duplicate_parts(): def test_json(): service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator()) - req = service.prepare_request('POST', url='', headers={ - 'X-opt-out': True}, data={'hello': 'world', 'fóó': 'bår'}) - assert req.get( - 'data') == b'{"hello": "world", "f\\u00f3\\u00f3": "b\\u00e5r"}' + req = service.prepare_request('POST', url='', headers={'X-opt-out': True}, data={'hello': 'world', 'fóó': 'bår'}) + assert req.get('data') == b'{"hello": "world", "f\\u00f3\\u00f3": "b\\u00e5r"}' def test_service_url_handling(): - service = AnyServiceV1( - '2018-11-20', service_url='https://host///////', authenticator=NoAuthAuthenticator()) + service = AnyServiceV1('2018-11-20', service_url='https://host///////', authenticator=NoAuthAuthenticator()) assert service.service_url == 'https://host' service.set_service_url('https://host/') assert service.service_url == 'https://host' - req = service.prepare_request('POST', - url='/path/', - headers={'X-opt-out': True}, - data={'hello': 'world'}) + req = service.prepare_request('POST', url='/path/', headers={'X-opt-out': True}, data={'hello': 'world'}) assert req.get('url') == 'https://host/path/' - service = AnyServiceV1( - '2018-11-20', service_url='https://host/', authenticator=NoAuthAuthenticator()) + service = AnyServiceV1('2018-11-20', service_url='https://host/', authenticator=NoAuthAuthenticator()) assert service.service_url == 'https://host' service.set_service_url('https://host/') assert service.service_url == 'https://host' - req = service.prepare_request('POST', - url='/', - headers={'X-opt-out': True}, - data={'hello': 'world'}) + req = service.prepare_request('POST', url='/', headers={'X-opt-out': True}, data={'hello': 'world'}) assert req.get('url') == 'https://host/' - req = service.prepare_request('POST', - url='////', - headers={'X-opt-out': True}, - data={'hello': 'world'}) + req = service.prepare_request('POST', url='////', headers={'X-opt-out': True}, data={'hello': 'world'}) assert req.get('url') == 'https://host/' service.set_service_url(None) assert service.service_url is None - service = AnyServiceV1('2018-11-20', service_url='/', - authenticator=NoAuthAuthenticator()) + service = AnyServiceV1('2018-11-20', service_url='/', authenticator=NoAuthAuthenticator()) assert service.service_url == '' service.set_service_url('/') assert service.service_url == '' with pytest.raises(ValueError) as err: - service.prepare_request('POST', - url='/', - headers={'X-opt-out': True}, - data={'hello': 'world'}) + service.prepare_request('POST', url='/', headers={'X-opt-out': True}, data={'hello': 'world'}) assert str(err.value) == 'The service_url is required' def test_service_url_slash(): - service = AnyServiceV1('2018-11-20', service_url='/', - authenticator=NoAuthAuthenticator()) + service = AnyServiceV1('2018-11-20', service_url='/', authenticator=NoAuthAuthenticator()) assert service.service_url == '' with pytest.raises(ValueError) as err: - service.prepare_request('POST', - url='/', - headers={'X-opt-out': True}, - data={'hello': 'world'}) + service.prepare_request('POST', url='/', headers={'X-opt-out': True}, data={'hello': 'world'}) assert str(err.value) == 'The service_url is required' @@ -886,31 +790,23 @@ def test_service_url_not_set(): def test_setting_proxy(): - service = BaseService(service_url='test', - authenticator=IAMAuthenticator('wonder woman')) + service = BaseService(service_url='test', authenticator=IAMAuthenticator('wonder woman')) assert service.authenticator is not None assert service.authenticator.token_manager.http_config == {} - http_config = { - "proxies": { - "http": "user:password@host:port" - } - } + http_config = {"proxies": {"http": "user:password@host:port"}} service.set_http_config(http_config) assert service.authenticator.token_manager.http_config == http_config - service2 = BaseService(service_url='test', authenticator=BasicAuthenticator( - 'marvellous', 'mrs maisel')) + service2 = BaseService(service_url='test', authenticator=BasicAuthenticator('marvellous', 'mrs maisel')) service2.set_http_config(http_config) assert service2.authenticator is not None def test_configure_service(): - file_path = os.path.join( - os.path.dirname(__file__), '../resources/ibm-credentials-external.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-external.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path - service = IncludeExternalConfigService( - 'v1', authenticator=NoAuthAuthenticator()) + service = IncludeExternalConfigService('v1', authenticator=NoAuthAuthenticator()) assert service.service_url == 'https://externallyconfigured.com/api' assert service.disable_ssl_verification is True # The authenticator should not be changed as a result of configure_service() @@ -918,8 +814,7 @@ def test_configure_service(): def test_configure_service_error(): - service = BaseService( - service_url='v1', authenticator=NoAuthAuthenticator()) + service = BaseService(service_url='v1', authenticator=NoAuthAuthenticator()) with pytest.raises(ValueError) as err: service.configure_service(None) assert str(err.value) == 'Service_name must be of type string.' diff --git a/test/test_basic_authenticator.py b/test/test_basic_authenticator.py index f3ed131..601f020 100644 --- a/test/test_basic_authenticator.py +++ b/test/test_basic_authenticator.py @@ -27,10 +27,14 @@ def test_basic_authenticator_validate_failed(): with pytest.raises(ValueError) as err: BasicAuthenticator('{my_username}', 'my_password') - assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) with pytest.raises(ValueError) as err: BasicAuthenticator('my_username', '{my_password}') - assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) diff --git a/test/test_container_authenticator.py b/test/test_container_authenticator.py index d751fb6..0247783 100644 --- a/test/test_container_authenticator.py +++ b/test/test_container_authenticator.py @@ -25,8 +25,7 @@ def test_container_authenticator(): # because both of the profile and ID are None. with pytest.raises(ValueError) as err: authenticator.set_iam_profile_name(None) - assert str( - err.value) == 'At least one of iam_profile_name or iam_profile_id must be specified.' + assert str(err.value) == 'At least one of iam_profile_name or iam_profile_id must be specified.' authenticator.set_iam_profile_id('iam-id-123') assert authenticator.token_manager.iam_profile_id == 'iam-id-123' @@ -57,8 +56,7 @@ def test_container_authenticator(): def test_disable_ssl_verification(): - authenticator = ContainerAuthenticator( - iam_profile_name='iam-user-123', disable_ssl_verification=True) + authenticator = ContainerAuthenticator(iam_profile_name='iam-user-123', disable_ssl_verification=True) assert authenticator.token_manager.disable_ssl_verification is True authenticator.set_disable_ssl_verification(False) @@ -67,8 +65,7 @@ def test_disable_ssl_verification(): def test_invalid_disable_ssl_verification_type(): with pytest.raises(TypeError) as err: - authenticator = ContainerAuthenticator( - iam_profile_name='iam-user-123', disable_ssl_verification='True') + authenticator = ContainerAuthenticator(iam_profile_name='iam-user-123', disable_ssl_verification='True') assert str(err.value) == 'disable_ssl_verification must be a bool' authenticator = ContainerAuthenticator(iam_profile_name='iam-user-123') @@ -80,8 +77,7 @@ def test_invalid_disable_ssl_verification_type(): def test_container_authenticator_with_scope(): - authenticator = ContainerAuthenticator( - iam_profile_name='iam-user-123', scope='scope1 scope2') + authenticator = ContainerAuthenticator(iam_profile_name='iam-user-123', scope='scope1 scope2') assert authenticator is not None assert authenticator.token_manager.scope == 'scope1 scope2' @@ -89,17 +85,12 @@ def test_container_authenticator_with_scope(): def test_authenticator_validate_failed(): with pytest.raises(ValueError) as err: ContainerAuthenticator(None) - assert str( - err.value) == 'At least one of iam_profile_name or iam_profile_id must be specified.' + assert str(err.value) == 'At least one of iam_profile_name or iam_profile_id must be specified.' with pytest.raises(ValueError) as err: - ContainerAuthenticator( - iam_profile_name='iam-user-123', client_id='my_client_id') - assert str( - err.value) == 'Both client_id and client_secret should be initialized.' + ContainerAuthenticator(iam_profile_name='iam-user-123', client_id='my_client_id') + assert str(err.value) == 'Both client_id and client_secret should be initialized.' with pytest.raises(ValueError) as err: - ContainerAuthenticator( - iam_profile_name='iam-user-123', client_secret='my_client_secret') - assert str( - err.value) == 'Both client_id and client_secret should be initialized.' + ContainerAuthenticator(iam_profile_name='iam-user-123', client_secret='my_client_secret') + assert str(err.value) == 'Both client_id and client_secret should be initialized.' diff --git a/test/test_container_token_manager.py b/test/test_container_token_manager.py index 0cd381d..efb8034 100644 --- a/test/test_container_token_manager.py +++ b/test/test_container_token_manager.py @@ -29,6 +29,7 @@ def mock_iam_response(func): """This is decorator function which extends `responses.activate`. This sets up all the mock response stuffs. """ + def callback(request): assert request.headers['Accept'] == 'application/json' assert request.headers['Content-Type'] == 'application/x-www-form-urlencoded' @@ -53,13 +54,15 @@ def callback(request): else: access_token = TEST_ACCESS_TOKEN_1 - response = json.dumps({ - 'access_token': access_token, - 'token_type': 'Bearer', - 'expires_in': 3600, - 'expiration': _get_current_time()+3600, - 'refresh_token': TEST_REFRESH_TOKEN, - }) + response = json.dumps( + { + 'access_token': access_token, + 'token_type': 'Bearer', + 'expires_in': 3600, + 'expiration': _get_current_time() + 3600, + 'refresh_token': TEST_REFRESH_TOKEN, + } + ) return (status_code, {}, response) @@ -98,10 +101,8 @@ def test_request_token_auth_default(): def test_request_token_auth_in_ctor(): default_auth_header = 'Basic Yng6Yng=' token_manager = ContainerTokenManager( - cr_token_filename=cr_token_file, - iam_profile_name=MOCK_IAM_PROFILE_NAME, - client_id='foo', - client_secret='bar') + cr_token_filename=cr_token_file, iam_profile_name=MOCK_IAM_PROFILE_NAME, client_id='foo', client_secret='bar' + ) token_manager.request_token() @@ -119,7 +120,8 @@ def test_request_token_auth_in_ctor_with_scope(): iam_profile_name=MOCK_IAM_PROFILE_NAME, client_id='foo', client_secret='bar', - scope='john snow') + scope='john snow', + ) token_manager.request_token() @@ -147,7 +149,10 @@ def test_retrieve_cr_token_fail(): with pytest.raises(Exception) as err: token_manager.retrieve_cr_token() - assert str(err.value) == 'Unable to retrieve the CR token value from file bogus-cr-token-file: [Errno 2] No such file or directory: \'bogus-cr-token-file\'' + assert ( + str(err.value) + == 'Unable to retrieve the CR token value from file bogus-cr-token-file: [Errno 2] No such file or directory: \'bogus-cr-token-file\'' + ) @mock_iam_response @@ -194,9 +199,7 @@ def test_request_token_success(): @mock_iam_response def test_authenticate_success(): - authenticator = ContainerAuthenticator( - cr_token_filename=cr_token_file, - iam_profile_name=MOCK_IAM_PROFILE_NAME) + authenticator = ContainerAuthenticator(cr_token_filename=cr_token_file, iam_profile_name=MOCK_IAM_PROFILE_NAME) request = {'headers': {}} @@ -223,22 +226,25 @@ def test_authenticate_fail_no_cr_token(): authenticator = ContainerAuthenticator( cr_token_filename='bogus-cr-token-file', iam_profile_name=MOCK_IAM_PROFILE_NAME, - url='https://bogus.iam.endpoint') + url='https://bogus.iam.endpoint', + ) request = {'headers': {}} with pytest.raises(Exception) as err: authenticator.authenticate(request) - assert str(err.value) == 'Unable to retrieve the CR token value from file bogus-cr-token-file: [Errno 2] No such file or directory: \'bogus-cr-token-file\'' + assert ( + str(err.value) + == 'Unable to retrieve the CR token value from file bogus-cr-token-file: [Errno 2] No such file or directory: \'bogus-cr-token-file\'' + ) @mock_iam_response def test_authenticate_fail_iam(): authenticator = ContainerAuthenticator( - cr_token_filename=cr_token_file, - iam_profile_name=MOCK_IAM_PROFILE_NAME, - scope='status-bad-request') + cr_token_filename=cr_token_file, iam_profile_name=MOCK_IAM_PROFILE_NAME, scope='status-bad-request' + ) request = {'headers': {}} @@ -260,6 +266,7 @@ def test_client_id_and_secret(): access_token = token_manager.get_token() assert access_token == TEST_ACCESS_TOKEN_1 + @mock_iam_response def test_setter_methods(): token_manager = ContainerTokenManager( diff --git a/test/test_cp4d_authenticator.py b/test/test_cp4d_authenticator.py index 5ba71c6..757c0ab 100644 --- a/test/test_cp4d_authenticator.py +++ b/test/test_cp4d_authenticator.py @@ -9,16 +9,14 @@ def test_cp4d_authenticator(): - authenticator = CloudPakForDataAuthenticator( - 'my_username', 'my_password', 'http://my_url') + authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', 'http://my_url') assert authenticator is not None assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CP4D assert authenticator.token_manager.url == 'http://my_url/v1/authorize' assert authenticator.token_manager.username == 'my_username' assert authenticator.token_manager.password == 'my_password' assert authenticator.token_manager.disable_ssl_verification is False - assert authenticator.token_manager.headers == { - 'Content-Type': 'application/json'} + assert authenticator.token_manager.headers == {'Content-Type': 'application/json'} assert authenticator.token_manager.proxies is None authenticator.set_disable_ssl_verification(True) @@ -41,7 +39,8 @@ def test_cp4d_authenticator(): def test_disable_ssl_verification(): authenticator = CloudPakForDataAuthenticator( - 'my_username', 'my_password', 'http://my_url', disable_ssl_verification=True) + 'my_username', 'my_password', 'http://my_url', disable_ssl_verification=True + ) assert authenticator.token_manager.disable_ssl_verification is True authenticator.set_disable_ssl_verification(False) @@ -51,11 +50,11 @@ def test_disable_ssl_verification(): def test_invalid_disable_ssl_verification_type(): with pytest.raises(TypeError) as err: authenticator = CloudPakForDataAuthenticator( - 'my_username', 'my_password', 'http://my_url', disable_ssl_verification='True') + 'my_username', 'my_password', 'http://my_url', disable_ssl_verification='True' + ) assert str(err.value) == 'disable_ssl_verification must be a bool' - authenticator = CloudPakForDataAuthenticator( - 'my_username', 'my_password', 'http://my_url') + authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', 'http://my_url') assert authenticator.token_manager.disable_ssl_verification is False with pytest.raises(TypeError) as err: @@ -66,19 +65,15 @@ def test_invalid_disable_ssl_verification_type(): def test_cp4d_authenticator_validate_failed(): with pytest.raises(ValueError) as err: CloudPakForDataAuthenticator('my_username', None, 'my_url') - assert str( - err.value) == 'Exactly one of `apikey` or `password` must be specified.' + assert str(err.value) == 'Exactly one of `apikey` or `password` must be specified.' with pytest.raises(ValueError) as err: CloudPakForDataAuthenticator(username='my_username', url='my_url') - assert str( - err.value) == 'Exactly one of `apikey` or `password` must be specified.' + assert str(err.value) == 'Exactly one of `apikey` or `password` must be specified.' with pytest.raises(ValueError) as err: - CloudPakForDataAuthenticator( - 'my_username', None, 'my_url', apikey=None) - assert str( - err.value) == 'Exactly one of `apikey` or `password` must be specified.' + CloudPakForDataAuthenticator('my_username', None, 'my_url', apikey=None) + assert str(err.value) == 'Exactly one of `apikey` or `password` must be specified.' with pytest.raises(ValueError) as err: CloudPakForDataAuthenticator(None, 'my_password', 'my_url') @@ -93,24 +88,29 @@ def test_cp4d_authenticator_validate_failed(): assert str(err.value) == 'The url shouldn\'t be None.' with pytest.raises(ValueError) as err: - CloudPakForDataAuthenticator( - username='my_username', password='my_password') + CloudPakForDataAuthenticator(username='my_username', password='my_password') assert str(err.value) == 'The url shouldn\'t be None.' with pytest.raises(ValueError) as err: CloudPakForDataAuthenticator('{my_username}', 'my_password', 'my_url') - assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) with pytest.raises(ValueError) as err: CloudPakForDataAuthenticator('my_username', '{my_password}', 'my_url') - assert str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The username and password shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) with pytest.raises(ValueError) as err: CloudPakForDataAuthenticator('my_username', 'my_password', '{my_url}') - assert str(err.value) == 'The url shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The url shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) @responses.activate @@ -119,35 +119,31 @@ def test_get_token(): access_token_layout = { "username": "dummy", "role": "Admin", - "permissions": [ - "administrator", - "manage_catalog" - ], + "permissions": ["administrator", "manage_catalog"], "sub": "admin", "iss": "sss", "aud": "sss", "uid": "sss", "iat": 1559324664, - "exp": 1559324664 + "exp": 1559324664, } - access_token = jwt.encode(access_token_layout, - 'secret', algorithm='HS256', - headers={'kid': '230498151c214b788dd97f22b85410a5'}) + access_token = jwt.encode( + access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) response = { "token": access_token, "token_type": "Bearer", "expires_in": 3600, "expiration": 1524167011, - "refresh_token": "jy4gl91BQ" + "refresh_token": "jy4gl91BQ", } - responses.add(responses.POST, url + '/v1/authorize', - body=json.dumps(response), status=200) + responses.add(responses.POST, url + '/v1/authorize', body=json.dumps(response), status=200) auth_headers = {'Host': 'cp4d.cloud.ibm.com:443'} authenticator = CloudPakForDataAuthenticator( - 'my_username', 'my_password', url + '/v1/authorize', - headers=auth_headers) + 'my_username', 'my_password', url + '/v1/authorize', headers=auth_headers + ) # Simulate an SDK API request that needs to be authenticated. request = {'headers': {}} @@ -159,12 +155,10 @@ def test_get_token(): assert request['headers']['Authorization'] is not None # Verify that the "get token" call contained the Host header. - assert responses.calls[0].request.headers.get( - 'Host') == 'cp4d.cloud.ibm.com:443' + assert responses.calls[0].request.headers.get('Host') == 'cp4d.cloud.ibm.com:443' # Ensure '/v1/authorize' is added to the url if omitted - authenticator = CloudPakForDataAuthenticator( - 'my_username', 'my_password', url) + authenticator = CloudPakForDataAuthenticator('my_username', 'my_password', url) request = {'headers': {}} authenticator.authenticate(request) diff --git a/test/test_cp4d_token_manager.py b/test/test_cp4d_token_manager.py index 1635cda..cf95503 100644 --- a/test/test_cp4d_token_manager.py +++ b/test/test_cp4d_token_manager.py @@ -15,21 +15,18 @@ def test_request_token(): access_token_layout = { "username": "dummy", "role": "Admin", - "permissions": [ - "administrator", - "manage_catalog" - ], + "permissions": ["administrator", "manage_catalog"], "sub": "admin", "iss": "sss", "aud": "sss", "uid": "sss", "iat": now, - "exp": now + 3600 + "exp": now + 3600, } - access_token = jwt.encode(access_token_layout, - 'secret', algorithm='HS256', - headers={'kid': '230498151c214b788dd97f22b85410a5'}) + access_token = jwt.encode( + access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) response = { "token": access_token, } diff --git a/test/test_detailed_response.py b/test/test_detailed_response.py index 506d63a..9139fb7 100644 --- a/test/test_detailed_response.py +++ b/test/test_detailed_response.py @@ -15,15 +15,18 @@ def clean(val): @responses.activate def test_detailed_response_dict(): - responses.add(responses.GET, - 'https://test.com', - status=200, - body=json.dumps({'foobar': 'baz'}), - content_type='application/json') - - mock_response = requests.get('https://test.com') - detailed_response = DetailedResponse(response=mock_response.json(), headers=mock_response.headers, - status_code=mock_response.status_code) + responses.add( + responses.GET, + 'https://test.com', + status=200, + body=json.dumps({'foobar': 'baz'}), + content_type='application/json', + ) + + mock_response = requests.get('https://test.com', timeout=None) + detailed_response = DetailedResponse( + response=mock_response.json(), headers=mock_response.headers, status_code=mock_response.status_code + ) assert detailed_response is not None assert detailed_response.get_result() == {'foobar': 'baz'} assert detailed_response.get_headers() == {'Content-Type': 'application/json'} @@ -31,21 +34,24 @@ def test_detailed_response_dict(): response_str = clean(str(detailed_response)) assert clean(str(detailed_response.get_result())) in response_str - #assert clean(str(detailed_response.get_headers())) in response_str + # assert clean(str(detailed_response.get_headers())) in response_str assert clean(str(detailed_response.get_status_code())) in response_str @responses.activate def test_detailed_response_list(): - responses.add(responses.GET, - 'https://test.com', - status=200, - body=json.dumps(['foobar', 'baz']), - content_type='application/json') - - mock_response = requests.get('https://test.com') - detailed_response = DetailedResponse(response=mock_response.json(), headers=mock_response.headers, - status_code=mock_response.status_code) + responses.add( + responses.GET, + 'https://test.com', + status=200, + body=json.dumps(['foobar', 'baz']), + content_type='application/json', + ) + + mock_response = requests.get('https://test.com', timeout=None) + detailed_response = DetailedResponse( + response=mock_response.json(), headers=mock_response.headers, status_code=mock_response.status_code + ) assert detailed_response is not None assert detailed_response.get_result() == ['foobar', 'baz'] assert detailed_response.get_headers() == {'Content-Type': 'application/json'} @@ -53,5 +59,5 @@ def test_detailed_response_list(): response_str = clean(str(detailed_response)) assert clean(str(detailed_response.get_result())) in response_str - #assert clean(str(detailed_response.get_headers())) in response_str + # assert clean(str(detailed_response.get_headers())) in response_str assert clean(str(detailed_response.get_status_code())) in response_str diff --git a/test/test_iam_authenticator.py b/test/test_iam_authenticator.py index 9855d5a..d0928ec 100644 --- a/test/test_iam_authenticator.py +++ b/test/test_iam_authenticator.py @@ -47,8 +47,7 @@ def test_iam_authenticator(): def test_disable_ssl_verification(): - authenticator = IAMAuthenticator( - apikey='my_apikey', disable_ssl_verification=True) + authenticator = IAMAuthenticator(apikey='my_apikey', disable_ssl_verification=True) assert authenticator.token_manager.disable_ssl_verification is True authenticator.set_disable_ssl_verification(False) @@ -57,8 +56,7 @@ def test_disable_ssl_verification(): def test_invalid_disable_ssl_verification_type(): with pytest.raises(TypeError) as err: - authenticator = IAMAuthenticator( - apikey='my_apikey', disable_ssl_verification='True') + authenticator = IAMAuthenticator(apikey='my_apikey', disable_ssl_verification='True') assert str(err.value) == 'disable_ssl_verification must be a bool' authenticator = IAMAuthenticator(apikey='my_apikey') @@ -82,20 +80,18 @@ def test_iam_authenticator_validate_failed(): with pytest.raises(ValueError) as err: IAMAuthenticator('{apikey}') - assert str( - err.value - ) == 'The apikey shouldn\'t start or end with curly brackets or quotes. '\ - 'Please remove any surrounding {, }, or \" characters.' + assert ( + str(err.value) == 'The apikey shouldn\'t start or end with curly brackets or quotes. ' + 'Please remove any surrounding {, }, or \" characters.' + ) with pytest.raises(ValueError) as err: IAMAuthenticator('my_apikey', client_id='my_client_id') - assert str( - err.value) == 'Both client_id and client_secret should be initialized.' + assert str(err.value) == 'Both client_id and client_secret should be initialized.' with pytest.raises(ValueError) as err: IAMAuthenticator('my_apikey', client_secret='my_client_secret') - assert str( - err.value) == 'Both client_id and client_secret should be initialized.' + assert str(err.value) == 'Both client_id and client_secret should be initialized.' @responses.activate @@ -110,25 +106,20 @@ def test_get_token(): "aud": "sss", "uid": "sss", "iat": 1559324664, - "exp": 1559324664 + "exp": 1559324664, } access_token = jwt.encode( - access_token_layout, - 'secret', - algorithm='HS256', - headers={ - 'kid': '230498151c214b788dd97f22b85410a5' - }) + access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) response = { "access_token": access_token, "token_type": "Bearer", "expires_in": 3600, "expiration": 1524167011, - "refresh_token": "jy4gl91BQ" + "refresh_token": "jy4gl91BQ", } - responses.add( - responses.POST, url=url, body=json.dumps(response), status=200) + responses.add(responses.POST, url=url, body=json.dumps(response), status=200) auth_headers = {'Host': 'iam.cloud.ibm.com:443'} authenticator = IAMAuthenticator('my_apikey', headers=auth_headers) @@ -143,8 +134,7 @@ def test_get_token(): assert request['headers']['Authorization'] is not None # Verify that the "get token" call contained the Host header. - assert responses.calls[0].request.headers.get( - 'Host') == 'iam.cloud.ibm.com:443' + assert responses.calls[0].request.headers.get('Host') == 'iam.cloud.ibm.com:443' def test_multiple_iam_authenticators(): diff --git a/test/test_iam_token_manager.py b/test/test_iam_token_manager.py index 5fab988..555f909 100644 --- a/test/test_iam_token_manager.py +++ b/test/test_iam_token_manager.py @@ -13,20 +13,18 @@ def get_access_token() -> str: access_token_layout = { "username": "dummy", "role": "Admin", - "permissions": [ - "administrator", - "manage_catalog" - ], + "permissions": ["administrator", "manage_catalog"], "sub": "admin", "iss": "sss", "aud": "sss", "uid": "sss", "iat": 3600, - "exp": int(time.time()) + "exp": int(time.time()), } - access_token = jwt.encode(access_token_layout, 'secret', algorithm='HS256', - headers={'kid': '230498151c214b788dd97f22b85410a5'}) + access_token = jwt.encode( + access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) return access_token @@ -64,8 +62,7 @@ def test_request_token_auth_in_ctor(): default_auth_header = 'Basic Yng6Yng=' responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager( - "apikey", url=iam_url, client_id='foo', client_secret='bar') + token_manager = IAMTokenManager("apikey", url=iam_url, client_id='foo', client_secret='bar') token_manager.request_token() assert len(responses.calls) == 1 @@ -88,8 +85,7 @@ def test_request_token_auth_in_ctor_with_scope(): default_auth_header = 'Basic Yng6Yng=' responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager( - "apikey", url=iam_url, client_id='foo', client_secret='bar', scope='john snow') + token_manager = IAMTokenManager("apikey", url=iam_url, client_id='foo', client_secret='bar', scope='john snow') token_manager.request_token() assert len(responses.calls) == 1 @@ -166,8 +162,7 @@ def test_request_token_auth_in_ctor_secret_only(): }""" responses.add(responses.POST, url=iam_url, body=response, status=200) - token_manager = IAMTokenManager( - "iam_apikey", url=iam_url, client_id=None, client_secret='bar') + token_manager = IAMTokenManager("iam_apikey", url=iam_url, client_id=None, client_secret='bar') token_manager.request_token() assert len(responses.calls) == 1 @@ -283,7 +278,9 @@ def test_get_refresh_token(): "expires_in": 3600, "expiration": 1524167011, "refresh_token": "jy4gl91BQ" - }""" % (access_token_str) + }""" % ( + access_token_str + ) responses.add(responses.POST, url=iam_url, body=response, status=200) token_manager = IAMTokenManager("iam_apikey") @@ -292,6 +289,7 @@ def test_get_refresh_token(): assert len(responses.calls) == 2 assert token_manager.refresh_token == "jy4gl91BQ" + # # In order to run the following integration test with a live IAM server: # diff --git a/test/test_jwt_token_manager.py b/test/test_jwt_token_manager.py index c611183..60232de 100644 --- a/test/test_jwt_token_manager.py +++ b/test/test_jwt_token_manager.py @@ -13,9 +13,8 @@ class JWTTokenManagerMockImpl(JWTTokenManager): def __init__(self, url: Optional[str] = None, access_token: Optional[str] = None) -> None: self.url = url self.access_token = access_token - self.request_count = 0 # just for tests to see how many times request was called - super().__init__(url, disable_ssl_verification=access_token, - token_name='access_token') + self.request_count = 0 # just for tests to see how many times request was called + super().__init__(url, disable_ssl_verification=access_token, token_name='access_token') def request_token(self) -> DetailedResponse: self.request_count += 1 @@ -23,27 +22,26 @@ def request_token(self) -> DetailedResponse: token_layout = { "username": "dummy", "role": "Admin", - "permissions": [ - "administrator", - "manage_catalog" - ], + "permissions": ["administrator", "manage_catalog"], "sub": "admin", "iss": "sss", "aud": "sss", "uid": "sss", "iat": current_time, - "exp": current_time + 3600 + "exp": current_time + 3600, } - access_token = jwt.encode(token_layout, 'secret', algorithm='HS256', - headers={'kid': '230498151c214b788dd97f22b85410a5'}) - response = {"access_token": access_token, - "token_type": "Bearer", - "expires_in": 3600, - "expiration": current_time + 3600, - "refresh_token": "jy4gl91BQ", - "from_token_manager": True - } + access_token = jwt.encode( + token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) + response = { + "access_token": access_token, + "token_type": "Bearer", + "expires_in": 3600, + "expiration": current_time + 3600, + "refresh_token": "jy4gl91BQ", + "from_token_manager": True, + } time.sleep(0.5) return response @@ -59,12 +57,13 @@ def test_get_token(): assert token_manager.token_info.get('expires_in') == 3600 assert token_manager._is_token_expired() is False - token_manager.token_info = {"access_token": "old_dummy", - "token_type": "Bearer", - "expires_in": 3600, - "expiration": time.time(), - "refresh_token": "jy4gl91BQ" - } + token_manager.token_info = { + "access_token": "old_dummy", + "token_type": "Bearer", + "expires_in": 3600, + "expiration": time.time(), + "refresh_token": "jy4gl91BQ", + } token = token_manager.get_token() assert token == old_token diff --git a/test/test_token_manager.py b/test/test_token_manager.py index 6aa1a55..c649d72 100644 --- a/test/test_token_manager.py +++ b/test/test_token_manager.py @@ -25,12 +25,8 @@ class MockTokenManager(TokenManager): - def request_token(self) -> None: - response = self._request( - method='GET', - url=self.url - ) + response = self._request(method='GET', url=self.url) return response def _save_token_info(self, token_response: dict) -> None: @@ -40,10 +36,12 @@ def _save_token_info(self, token_response: dict) -> None: def test_abstract_class_instantiation(): with pytest.raises(TypeError) as err: TokenManager(None) - assert str(err.value) == "Can't instantiate abstract class " \ - "TokenManager with abstract methods " \ - "_save_token_info, " \ - "request_token" + assert ( + str(err.value) == "Can't instantiate abstract class " + "TokenManager with abstract methods " + "_save_token_info, " + "request_token" + ) def requests_request_spy(*args, **kwargs): diff --git a/test/test_utils.py b/test/test_utils.py index 0374595..ddd5d7e 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -41,17 +41,13 @@ def datetime_test(datestr: str, expected: str): def test_datetime(): # RFC 3339 with various flavors of tz-offset datetime_test('2016-06-20T04:25:16.218Z', '2016-06-20T04:25:16.218000Z') - datetime_test('2016-06-20T04:25:16.218+0000', - '2016-06-20T04:25:16.218000Z') + datetime_test('2016-06-20T04:25:16.218+0000', '2016-06-20T04:25:16.218000Z') datetime_test('2016-06-20T04:25:16.218+00', '2016-06-20T04:25:16.218000Z') - datetime_test('2016-06-20T04:25:16.218-0000', - '2016-06-20T04:25:16.218000Z') + datetime_test('2016-06-20T04:25:16.218-0000', '2016-06-20T04:25:16.218000Z') datetime_test('2016-06-20T04:25:16.218-00', '2016-06-20T04:25:16.218000Z') - datetime_test('2016-06-20T00:25:16.218-0400', - '2016-06-20T04:25:16.218000Z') + datetime_test('2016-06-20T00:25:16.218-0400', '2016-06-20T04:25:16.218000Z') datetime_test('2016-06-20T00:25:16.218-04', '2016-06-20T04:25:16.218000Z') - datetime_test('2016-06-20T07:25:16.218+0300', - '2016-06-20T04:25:16.218000Z') + datetime_test('2016-06-20T07:25:16.218+0300', '2016-06-20T04:25:16.218000Z') datetime_test('2016-06-20T07:25:16.218+03', '2016-06-20T04:25:16.218000Z') datetime_test('2016-06-20T04:25:16Z', '2016-06-20T04:25:16Z') datetime_test('2016-06-20T04:25:16+0000', '2016-06-20T04:25:16Z') @@ -62,12 +58,9 @@ def test_datetime(): datetime_test('2016-06-20T16:25:16+12:00', '2016-06-20T04:25:16Z') # RFC 3339 with nanoseconds for the Catalog-Managements of the world. - datetime_test('2020-03-12T10:52:12.866305005-04:00', - '2020-03-12T14:52:12.866305Z') - datetime_test('2020-03-12T10:52:12.866305005Z', - '2020-03-12T10:52:12.866305Z') - datetime_test('2020-03-12T10:52:12.866305005+02:30', - '2020-03-12T08:22:12.866305Z') + datetime_test('2020-03-12T10:52:12.866305005-04:00', '2020-03-12T14:52:12.866305Z') + datetime_test('2020-03-12T10:52:12.866305005Z', '2020-03-12T10:52:12.866305Z') + datetime_test('2020-03-12T10:52:12.866305005+02:30', '2020-03-12T08:22:12.866305Z') datetime_test('2020-03-12T10:52:12.866305Z', '2020-03-12T10:52:12.866305Z') # UTC datetime with no TZ. @@ -109,8 +102,7 @@ def test_datetime_to_string(): res = datetime_to_string(date) assert res == '2017-03-06T16:00:04.159338Z' # Test date with UTC timezone - date = datetime.datetime(2017, 3, 6, 16, 0, 4, 159338, - datetime.timezone.utc) + date = datetime.datetime(2017, 3, 6, 16, 0, 4, 159338, datetime.timezone.utc) res = datetime_to_string(date) assert res == '2017-03-06T16:00:04.159338Z' # Test date with non-UTC timezone @@ -128,8 +120,7 @@ def test_string_to_datetime_list(): date_list = string_to_datetime_list(['2017-03-06 16:00:04.159338']) assert date_list[0].day == 6 assert date_list[0].hour == 16 - assert date_list[0].tzinfo.utcoffset( - None) == datetime.timezone.utc.utcoffset(None) + assert date_list[0].tzinfo.utcoffset(None) == datetime.timezone.utc.utcoffset(None) # Test date string with TZ specified as '+xxxx' date_list = string_to_datetime_list(['2017-03-06 16:00:04.159338+0600']) assert date_list[0].day == 6 @@ -139,19 +130,15 @@ def test_string_to_datetime_list(): date_list = string_to_datetime_list(['2017-03-06 16:00:04.159338Z']) assert date_list[0].day == 6 assert date_list[0].hour == 16 - assert date_list[0].tzinfo.utcoffset( - None) == datetime.timezone.utc.utcoffset(None) + assert date_list[0].tzinfo.utcoffset(None) == datetime.timezone.utc.utcoffset(None) # Test multiple datetimes in a list - date_list = string_to_datetime_list( - ['2017-03-06 16:00:04.159338', '2017-03-07 17:00:04.159338']) + date_list = string_to_datetime_list(['2017-03-06 16:00:04.159338', '2017-03-07 17:00:04.159338']) assert date_list[0].day == 6 assert date_list[0].hour == 16 - assert date_list[0].tzinfo.utcoffset( - None) == datetime.timezone.utc.utcoffset(None) + assert date_list[0].tzinfo.utcoffset(None) == datetime.timezone.utc.utcoffset(None) assert date_list[1].day == 7 assert date_list[1].hour == 17 - assert date_list[1].tzinfo.utcoffset( - None) == datetime.timezone.utc.utcoffset(None) + assert date_list[1].tzinfo.utcoffset(None) == datetime.timezone.utc.utcoffset(None) def test_datetime_to_string_list(): @@ -167,8 +154,7 @@ def test_datetime_to_string_list(): res = datetime_to_string_list(date_list) assert res == ['2017-03-06T16:00:04.159338Z'] # Test date list item with UTC timezone - date_list = [datetime.datetime(2017, 3, 6, 16, 0, 4, 159338, - datetime.timezone.utc)] + date_list = [datetime.datetime(2017, 3, 6, 16, 0, 4, 159338, datetime.timezone.utc)] res = datetime_to_string_list(date_list) assert res == ['2017-03-06T16:00:04.159338Z'] # Test date list item with non-UTC timezone @@ -177,12 +163,12 @@ def test_datetime_to_string_list(): res = datetime_to_string_list(date_list) assert res == ['2017-03-06T16:00:04.159338Z'] # Test specified date list with multiple items - date_list = [datetime.datetime(2017, 3, 6, 16, 0, 4, 159338), - datetime.datetime(2017, 3, 6, 16, 0, 4, 159338, - datetime.timezone.utc)] + date_list = [ + datetime.datetime(2017, 3, 6, 16, 0, 4, 159338), + datetime.datetime(2017, 3, 6, 16, 0, 4, 159338, datetime.timezone.utc), + ] res = datetime_to_string_list(date_list) - assert res == ['2017-03-06T16:00:04.159338Z', - '2017-03-06T16:00:04.159338Z'] + assert res == ['2017-03-06T16:00:04.159338Z', '2017-03-06T16:00:04.159338Z'] def test_date_conversion(): @@ -283,8 +269,7 @@ def test_convert_list(): # pylint: disable=too-many-statements def test_get_authenticator_from_credential_file(): - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-iam.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('ibm watson') assert authenticator is not None @@ -297,8 +282,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.scope is None del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-basic.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-basic.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('watson') assert authenticator is not None @@ -306,8 +290,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.username == 'my_username' del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-container.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-container.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('service 1') assert authenticator is not None @@ -334,8 +317,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.disable_ssl_verification is False del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-cp4d.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-cp4d.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('watson') assert authenticator is not None @@ -347,16 +329,14 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.disable_ssl_verification is False del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-no-auth.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-no-auth.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('watson') assert authenticator is not None assert authenticator.authentication_type() == Authenticator.AUTHTYPE_NOAUTH del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-bearer.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-bearer.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('watson') assert authenticator is not None @@ -364,8 +344,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.bearer_token is not None del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('service_1') assert authenticator is not None @@ -377,8 +356,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.scope is None del os.environ['IBM_CREDENTIALS_FILE'] - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-vpc.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('service1') assert authenticator is not None @@ -387,8 +365,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.iam_profile_id is None assert authenticator.token_manager.url == 'http://169.254.169.254' - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-vpc.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('service2') assert authenticator is not None @@ -397,8 +374,7 @@ def test_get_authenticator_from_credential_file(): assert authenticator.token_manager.iam_profile_id is None assert authenticator.token_manager.url == 'http://vpc.imds.com/api' - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials-vpc.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('service3') assert authenticator is not None @@ -409,8 +385,7 @@ def test_get_authenticator_from_credential_file(): def test_get_authenticator_from_credential_file_scope(): - file_path = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials.env') + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('service_2') assert authenticator is not None @@ -608,8 +583,7 @@ def test_read_external_sources_1(): def test_read_external_sources_2(): # The config file should take precedence over the env variable. - config_file = os.path.join(os.path.dirname(__file__), - '../resources/ibm-credentials.env') + config_file = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') os.environ['IBM_CREDENTIALS_FILE'] = config_file # This should be ignored since IBM_CREDENTIALS_FILE points to a valid file. @@ -630,5 +604,4 @@ def test_strip_extra_slashes(): assert strip_extra_slashes('https://host/path/') == 'https://host/path/' assert strip_extra_slashes('https://host/path//') == 'https://host/path/' assert strip_extra_slashes('https://host//path//') == 'https://host//path/' - assert strip_extra_slashes( - 'https://host//path//////////') == 'https://host//path/' + assert strip_extra_slashes('https://host//path//////////') == 'https://host//path/' diff --git a/test/test_vpc_instance_authenticator.py b/test/test_vpc_instance_authenticator.py index 36a4c3b..5cd21f4 100644 --- a/test/test_vpc_instance_authenticator.py +++ b/test/test_vpc_instance_authenticator.py @@ -9,7 +9,7 @@ def test_constructor(): - authenticator =VPCInstanceAuthenticator(iam_profile_id=TEST_IAM_PROFILE_ID, url='someurl.com') + authenticator = VPCInstanceAuthenticator(iam_profile_id=TEST_IAM_PROFILE_ID, url='someurl.com') assert authenticator is not None assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC assert authenticator.token_manager.iam_profile_crn is None @@ -18,7 +18,7 @@ def test_constructor(): def test_setters(): - authenticator =VPCInstanceAuthenticator(iam_profile_id=TEST_IAM_PROFILE_ID, url='someurl.com') + authenticator = VPCInstanceAuthenticator(iam_profile_id=TEST_IAM_PROFILE_ID, url='someurl.com') assert authenticator is not None assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC assert authenticator.token_manager.iam_profile_crn is None @@ -29,8 +29,7 @@ def test_setters(): # because at most one of iam_profile_crn or iam_profile_id may be specified. with pytest.raises(ValueError) as err: authenticator.set_iam_profile_crn(TEST_IAM_PROFILE_CRN) - assert str( - err.value) == 'At most one of "iam_profile_id" or "iam_profile_crn" may be specified.' + assert str(err.value) == 'At most one of "iam_profile_id" or "iam_profile_crn" may be specified.' authenticator.set_iam_profile_id(None) assert authenticator.token_manager.iam_profile_id is None @@ -42,11 +41,10 @@ def test_setters(): def test_constructor_validate_failed(): with pytest.raises(ValueError) as err: VPCInstanceAuthenticator( - iam_profile_crn=TEST_IAM_PROFILE_CRN, - iam_profile_id=TEST_IAM_PROFILE_ID, - ) - assert str( - err.value) == 'At most one of "iam_profile_id" or "iam_profile_crn" may be specified.' + iam_profile_crn=TEST_IAM_PROFILE_CRN, + iam_profile_id=TEST_IAM_PROFILE_ID, + ) + assert str(err.value) == 'At most one of "iam_profile_id" or "iam_profile_crn" may be specified.' def test_authenticate(): diff --git a/test/test_vpc_instance_token_manager.py b/test/test_vpc_instance_token_manager.py index be681f7..9549d77 100644 --- a/test/test_vpc_instance_token_manager.py +++ b/test/test_vpc_instance_token_manager.py @@ -24,7 +24,7 @@ from ibm_cloud_sdk_core import ApiException, VPCInstanceTokenManager -#pylint: disable=line-too-long +# pylint: disable=line-too-long TEST_ACCESS_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6ImhlbGxvIiwicm9sZSI6InVzZXIiLCJwZXJtaXNzaW9ucyI6WyJhZG1pbmlzdHJhdG9yIiwiZGVwbG95bWVudF9hZG1pbiJdLCJzdWIiOiJoZWxsbyIsImlzcyI6IkpvaG4iLCJhdWQiOiJEU1giLCJ1aWQiOiI5OTkiLCJpYXQiOjE1NjAyNzcwNTEsImV4cCI6MTU2MDI4MTgxOSwianRpIjoiMDRkMjBiMjUtZWUyZC00MDBmLTg2MjMtOGNkODA3MGI1NDY4In0.cIodB4I6CCcX8vfIImz7Cytux3GpWyObt9Gkur5g1QI' TEST_TOKEN = 'abc123' TEST_IAM_TOKEN = 'iam-abc123' @@ -71,8 +71,7 @@ def test_retrieve_instance_identity_token(caplog): 'access_token': TEST_TOKEN, } - responses.add(responses.PUT, 'http://someurl.com/instance_identity/v1/token', - body=json.dumps(response), status=200) + responses.add(responses.PUT, 'http://someurl.com/instance_identity/v1/token', body=json.dumps(response), status=200) ii_token = token_manager.retrieve_instance_identity_token() assert len(responses.calls) == 1 @@ -83,8 +82,11 @@ def test_retrieve_instance_identity_token(caplog): assert responses.calls[0].request.body == '{"expires_in": 300}' assert ii_token == TEST_TOKEN # Check the logs. - #pylint: disable=line-too-long - assert caplog.record_tuples[0][2] == 'Invoking VPC \'create_access_token\' operation: http://someurl.com/instance_identity/v1/token' + # pylint: disable=line-too-long + assert ( + caplog.record_tuples[0][2] + == 'Invoking VPC \'create_access_token\' operation: http://someurl.com/instance_identity/v1/token' + ) assert caplog.record_tuples[1][2] == 'Returned from VPC \'create_access_token\' operation."' @@ -101,16 +103,18 @@ def test_retrieve_instance_identity_token_failed(caplog): 'errors': ['Ooops'], } - responses.add(responses.PUT, 'http://someurl.com/instance_identity/v1/token', - body=json.dumps(response), status=400) + responses.add(responses.PUT, 'http://someurl.com/instance_identity/v1/token', body=json.dumps(response), status=400) with pytest.raises(ApiException): token_manager.retrieve_instance_identity_token() assert len(responses.calls) == 1 # Check the logs. - #pylint: disable=line-too-long - assert caplog.record_tuples[0][2] == 'Invoking VPC \'create_access_token\' operation: http://someurl.com/instance_identity/v1/token' + # pylint: disable=line-too-long + assert ( + caplog.record_tuples[0][2] + == 'Invoking VPC \'create_access_token\' operation: http://someurl.com/instance_identity/v1/token' + ) @responses.activate @@ -124,14 +128,16 @@ def test_request_token_with_crn(caplog): # Mock the retrieve instance identity token method. def mock_retrieve_instance_identity_token(): return TEST_TOKEN + token_manager.retrieve_instance_identity_token = mock_retrieve_instance_identity_token response = { 'access_token': TEST_IAM_TOKEN, } - responses.add(responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', - body=json.dumps(response), status=200) + responses.add( + responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', body=json.dumps(response), status=200 + ) response = token_manager.request_token() assert len(responses.calls) == 1 @@ -141,8 +147,11 @@ def mock_retrieve_instance_identity_token(): assert responses.calls[0].request.body == '{"trusted_profile": {"crn": "crn:iam-profile:123"}}' assert responses.calls[0].request.params['version'] == '2021-09-20' # Check the logs. - #pylint: disable=line-too-long - assert caplog.record_tuples[0][2] == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + # pylint: disable=line-too-long + assert ( + caplog.record_tuples[0][2] + == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + ) assert caplog.record_tuples[1][2] == 'Returned from VPC \'create_iam_token\' operation."' @@ -157,14 +166,16 @@ def test_request_token_with_id(caplog): # Mock the retrieve instance identity token method. def mock_retrieve_instance_identity_token(): return TEST_TOKEN + token_manager.retrieve_instance_identity_token = mock_retrieve_instance_identity_token response = { 'access_token': TEST_IAM_TOKEN, } - responses.add(responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', - body=json.dumps(response), status=200) + responses.add( + responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', body=json.dumps(response), status=200 + ) response = token_manager.request_token() assert len(responses.calls) == 1 @@ -174,8 +185,11 @@ def mock_retrieve_instance_identity_token(): assert responses.calls[0].request.body == '{"trusted_profile": {"id": "iam-id-123"}}' assert responses.calls[0].request.params['version'] == '2021-09-20' # Check the logs. - #pylint: disable=line-too-long - assert caplog.record_tuples[0][2] == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + # pylint: disable=line-too-long + assert ( + caplog.record_tuples[0][2] + == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + ) assert caplog.record_tuples[1][2] == 'Returned from VPC \'create_iam_token\' operation."' @@ -188,14 +202,16 @@ def test_request_token(caplog): # Mock the retrieve instance identity token method. def mock_retrieve_instance_identity_token(): return TEST_TOKEN + token_manager.retrieve_instance_identity_token = mock_retrieve_instance_identity_token response = { 'access_token': TEST_IAM_TOKEN, } - responses.add(responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', - body=json.dumps(response), status=200) + responses.add( + responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', body=json.dumps(response), status=200 + ) response = token_manager.request_token() assert len(responses.calls) == 1 @@ -205,8 +221,11 @@ def mock_retrieve_instance_identity_token(): assert responses.calls[0].request.body is None assert responses.calls[0].request.params['version'] == '2021-09-20' # Check the logs. - #pylint: disable=line-too-long - assert caplog.record_tuples[0][2] == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + # pylint: disable=line-too-long + assert ( + caplog.record_tuples[0][2] + == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + ) assert caplog.record_tuples[1][2] == 'Returned from VPC \'create_iam_token\' operation."' @@ -221,21 +240,26 @@ def test_request_token_failed(caplog): # Mock the retrieve instance identity token method. def mock_retrieve_instance_identity_token(): return TEST_TOKEN + token_manager.retrieve_instance_identity_token = mock_retrieve_instance_identity_token response = { 'errors': ['Ooops'], } - responses.add(responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', - body=json.dumps(response), status=400) + responses.add( + responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', body=json.dumps(response), status=400 + ) with pytest.raises(ApiException): token_manager.request_token() assert len(responses.calls) == 1 # Check the logs. - #pylint: disable=line-too-long - assert caplog.record_tuples[0][2] == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + # pylint: disable=line-too-long + assert ( + caplog.record_tuples[0][2] + == 'Invoking VPC \'create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token' + ) @responses.activate @@ -251,10 +275,15 @@ def test_access_token(): 'access_token': TEST_ACCESS_TOKEN, } - responses.add(responses.PUT, 'http://169.254.169.254/instance_identity/v1/token', - body=json.dumps(response_ii), status=200) - responses.add(responses.POST, 'http://169.254.169.254/instance_identity/v1/iam_token', - body=json.dumps(response_iam), status=200) + responses.add( + responses.PUT, 'http://169.254.169.254/instance_identity/v1/token', body=json.dumps(response_ii), status=200 + ) + responses.add( + responses.POST, + 'http://169.254.169.254/instance_identity/v1/iam_token', + body=json.dumps(response_iam), + status=200, + ) assert token_manager.access_token is None assert token_manager.expire_time == 0 diff --git a/test_integration/test_cp4d_authenticator_integration.py b/test_integration/test_cp4d_authenticator_integration.py index ab8862b..7e6b83c 100644 --- a/test_integration/test_cp4d_authenticator_integration.py +++ b/test_integration/test_cp4d_authenticator_integration.py @@ -14,8 +14,7 @@ def test_cp4d_authenticator_password(): - file_path = os.path.join( - os.path.dirname(__file__), IBM_CREDENTIALS_FILE) + file_path = os.path.join(os.path.dirname(__file__), IBM_CREDENTIALS_FILE) os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('cp4d_password_test') @@ -30,8 +29,7 @@ def test_cp4d_authenticator_password(): def test_cp4d_authenticator_apikey(): - file_path = os.path.join( - os.path.dirname(__file__), IBM_CREDENTIALS_FILE) + file_path = os.path.join(os.path.dirname(__file__), IBM_CREDENTIALS_FILE) os.environ['IBM_CREDENTIALS_FILE'] = file_path authenticator = get_authenticator_from_environment('cp4d_apikey_test')