diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 12ddc2b20911..116c818a2e09 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.13.1 +Low-code: Add `SessionTokenAuthenticator` + ## 0.13.0 Add `Stream.check_availability` and `Stream.AvailabilityStrategy`. Make `HttpAvailabilityStrategy` the default `HttpStream.AvailabilityStrategy`. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py index c37e46e904ef..a4f105dbfe60 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py @@ -3,13 +3,16 @@ # import base64 +import logging from dataclasses import InitVar, dataclass from typing import Any, Mapping, Union +import requests from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.types import Config from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator +from cachetools import TTLCache, cached from dataclasses_jsonschema import JsonSchemaMixin @@ -115,3 +118,125 @@ def token(self) -> str: auth_string = f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8") b64_encoded = base64.b64encode(auth_string).decode("utf8") return f"Basic {b64_encoded}" + + +""" + maxsize - The maximum size of the cache + ttl - time-to-live value in seconds + docs https://cachetools.readthedocs.io/en/latest/ + maxsize=1000 - when the cache is full, in this case more than 1000, + i.e. by adding another item the cache would exceed its maximum size, the cache must choose which item(s) to discard + ttl=86400 means that cached token will live for 86400 seconds (one day) +""" +cacheSessionTokenAuthenticator = TTLCache(maxsize=1000, ttl=86400) + + +@cached(cacheSessionTokenAuthenticator) +def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str: + """ + This method retrieves session token from api by username and password for SessionTokenAuthenticator. + It's cashed to avoid a multiple calling by sync and updating session token every stream sync. + Args: + api_url: api url for getting new session token + username: username for auth + password: password for auth + response_key: field name in response to retrieve a session token + + Returns: + session token + """ + response = requests.post( + f"{api_url}", + headers={"Content-Type": "application/json"}, + json={"username": username, "password": password}, + ) + response.raise_for_status() + if not response.ok: + raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}") + return response.json()[response_key] + + +@dataclass +class SessionTokenAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin): + """ + Builds auth based on session tokens. + A session token is a random value generated by a server to identify + a specific user for the duration of one interaction session. + + The header is of the form + `"Specific Header": "Session Token Value"` + + Attributes: + api_url (Union[InterpolatedString, str]): Base api url of source + username (Union[InterpolatedString, str]): The username + config (Config): The user-provided configuration as specified by the source's spec + password (Union[InterpolatedString, str]): The password + header (Union[InterpolatedString, str]): Specific header of source for providing session token + options (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation + session_token (Union[InterpolatedString, str]): Session token generated by user + session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response + login_url (Union[InterpolatedString, str]): Url fot getting a specific session token + validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token + """ + + api_url: Union[InterpolatedString, str] + header: Union[InterpolatedString, str] + session_token: Union[InterpolatedString, str] + session_token_response_key: Union[InterpolatedString, str] + username: Union[InterpolatedString, str] + config: Config + options: InitVar[Mapping[str, Any]] + login_url: Union[InterpolatedString, str] + validate_session_url: Union[InterpolatedString, str] + password: Union[InterpolatedString, str] = "" + + def __post_init__(self, options): + self._username = InterpolatedString.create(self.username, options=options) + self._password = InterpolatedString.create(self.password, options=options) + self._api_url = InterpolatedString.create(self.api_url, options=options) + self._header = InterpolatedString.create(self.header, options=options) + self._session_token = InterpolatedString.create(self.session_token, options=options) + self._session_token_response_key = InterpolatedString.create(self.session_token_response_key, options=options) + self._login_url = InterpolatedString.create(self.login_url, options=options) + self._validate_session_url = InterpolatedString.create(self.validate_session_url, options=options) + + self.logger = logging.getLogger("airbyte") + + @property + def auth_header(self) -> str: + return self._header.eval(self.config) + + @property + def token(self) -> str: + if self._session_token.eval(self.config): + if self.is_valid_session_token(): + return self._session_token.eval(self.config) + if self._password.eval(self.config) and self._username.eval(self.config): + username = self._username.eval(self.config) + password = self._password.eval(self.config) + session_token_response_key = self._session_token_response_key.eval(self.config) + api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}" + + self.logger.info("Using generated session token by username and password") + return get_new_session_token(api_url, username, password, session_token_response_key) + + raise ConnectionError("Invalid credentials: session token is not valid or provide username and password") + + def is_valid_session_token(self) -> bool: + try: + response = requests.get( + f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}", + headers={self.auth_header: self._session_token.eval(self.config)}, + ) + response.raise_for_status() + except requests.exceptions.HTTPError as e: + if e.response.status_code == requests.codes["unauthorized"]: + self.logger.info(f"Unable to connect by session token from config due to {str(e)}") + return False + else: + raise ConnectionError(f"Error while validating session token: {e}") + if response.ok: + self.logger.info("Connection check for source is successful.") + return True + else: + raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}") diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/config_component_schema.json b/airbyte-cdk/python/airbyte_cdk/sources/declarative/config_component_schema.json index 8f61c0716113..607cfacd930b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/config_component_schema.json +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/config_component_schema.json @@ -289,6 +289,9 @@ }, { "$ref": "#/definitions/BasicHttpAuthenticator" + }, + { + "$ref": "#/definitions/SessionTokenAuthenticator" } ] }, @@ -629,6 +632,75 @@ ], "description": "\n Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64\n https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme\n\n The header is of the form\n `\"Authorization\": \"Basic \"`\n\n Attributes:\n username (Union[InterpolatedString, str]): The username\n config (Config): The user-provided configuration as specified by the source's spec\n password (Union[InterpolatedString, str]): The password\n options (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation\n " }, + "SessionTokenAuthenticator": { + "allOf": [ + { + "$ref": "#/definitions/DeclarativeAuthenticator" + }, + { + "type": "object", + "required": ["api_url", "header"], + "properties": { + "api_url": { + "anyOf": [ + { + "$ref": "#/definitions/InterpolatedString" + }, + { + "type": "string" + } + ] + }, + "session_token": { + "anyOf": [ + { + "$ref": "#/definitions/InterpolatedString" + }, + { + "type": "string" + } + ], + "default": "" + }, + "username": { + "anyOf": [ + { + "$ref": "#/definitions/InterpolatedString" + }, + { + "type": "string" + } + ], + "default": "" + }, + "password": { + "anyOf": [ + { + "$ref": "#/definitions/InterpolatedString" + }, + { + "type": "string" + } + ], + "default": "" + }, + "header": { + "anyOf": [ + { + "$ref": "#/definitions/InterpolatedString" + }, + { + "type": "string" + } + ] + }, + "config": { + "type": "object" + } + } + } + ] + }, "CompositeErrorHandler": { "allOf": [ { diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 66d4d6b0bc90..f26287fa671f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -6,7 +6,12 @@ from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator -from airbyte_cdk.sources.declarative.auth.token import ApiKeyAuthenticator, BasicHttpAuthenticator, BearerAuthenticator +from airbyte_cdk.sources.declarative.auth.token import ( + ApiKeyAuthenticator, + BasicHttpAuthenticator, + BearerAuthenticator, + SessionTokenAuthenticator, +) from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor @@ -78,6 +83,7 @@ "SingleSlice": SingleSlice, "Spec": Spec, "SubstreamSlicer": SubstreamSlicer, + "SessionTokenAuthenticator": SessionTokenAuthenticator, "WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy, "WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy, } diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 0ac6109245a0..c4065d6fa8e1 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.13.0", + version="0.13.1", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -58,6 +58,7 @@ "requests_cache", "Deprecated~=1.2", "Jinja2~=3.1.2", + "cachetools", ], python_requires=">=3.9", extras_require={ diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/auth/test_session_token_auth.py b/airbyte-cdk/python/unit_tests/sources/declarative/auth/test_session_token_auth.py new file mode 100644 index 000000000000..47334cbdfafd --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/auth/test_session_token_auth.py @@ -0,0 +1,184 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from airbyte_cdk.sources.declarative.auth.token import SessionTokenAuthenticator, get_new_session_token +from requests.exceptions import HTTPError + +options = {"hello": "world"} +instance_api_url = "https://airbyte.metabaseapp.com/api/" +username = "username" +password = "password" +session_token = "session_token" +header = "X-App-Session" +session_token_response_key = "id" +login_url = "session" +validate_session_url = "user/current" + +input_instance_api_url = "{{ config['instance_api_url'] }}" +input_username = "{{ config['username'] }}" +input_password = "{{ config['password'] }}" +input_session_token = "{{ config['session_token'] }}" + +config = { + "instance_api_url": instance_api_url, + "username": username, + "password": password, + "session_token": session_token, + "header": header, + "session_token_response_key": session_token_response_key, + "login_url": login_url, + "validate_session_url": validate_session_url +} + +config_session_token = { + "instance_api_url": instance_api_url, + "username": "", + "password": "", + "session_token": session_token, + "header": header, + "session_token_response_key": session_token_response_key, + "login_url": login_url, + "validate_session_url": validate_session_url +} + +config_username_password = { + "instance_api_url": instance_api_url, + "username": username, + "password": password, + "session_token": "", + "header": header, + "session_token_response_key": session_token_response_key, + "login_url": login_url, + "validate_session_url": validate_session_url +} + + +def test_auth_header(): + auth_header = SessionTokenAuthenticator( + config=config, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + login_url=login_url, + validate_session_url=validate_session_url + ).auth_header + assert auth_header == "X-App-Session" + + +def test_get_token_valid_session(requests_mock): + requests_mock.get( + f"{config_session_token['instance_api_url']}user/current", + json={"common_name": "common_name", "last_login": "last_login"} + ) + + token = SessionTokenAuthenticator( + config=config_session_token, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + login_url=login_url, + validate_session_url=validate_session_url + ).token + assert token == "session_token" + + +def test_get_token_invalid_session_unauthorized(): + with pytest.raises(ConnectionError): + _ = SessionTokenAuthenticator( + config=config_session_token, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + login_url=login_url, + validate_session_url=validate_session_url + ).token + + +def test_get_token_invalid_username_password_unauthorized(): + with pytest.raises(HTTPError): + _ = SessionTokenAuthenticator( + config=config_username_password, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + validate_session_url=validate_session_url, + login_url=login_url + ).token + + +def test_get_token_username_password(requests_mock): + requests_mock.post(f"{config['instance_api_url']}session", json={"id": "some session id"}) + + token = SessionTokenAuthenticator( + config=config_username_password, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + login_url=login_url, + validate_session_url=validate_session_url + ).token + assert token == "some session id" + + +def test_check_is_valid_session_token(requests_mock): + requests_mock.get(f"{config['instance_api_url']}user/current", + json={"common_name": "common_name", "last_login": "last_login"}) + + assert SessionTokenAuthenticator( + config=config, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + validate_session_url=validate_session_url, + login_url=login_url + ).is_valid_session_token() + + +def test_check_is_valid_session_token_unauthorized(): + assert not SessionTokenAuthenticator( + config=config, + options=options, + api_url=input_instance_api_url, + username=input_username, + password=input_password, + session_token=input_session_token, + header=header, + session_token_response_key=session_token_response_key, + login_url=login_url, + validate_session_url=validate_session_url + ).is_valid_session_token() + + +def test_get_new_session_token(requests_mock): + requests_mock.post(f"{config['instance_api_url']}session", headers={"Content-Type": "application/json"}, + json={"id": "some session id"}) + + session_token = get_new_session_token(f'{config["instance_api_url"]}session', config["username"], + config["password"], config["session_token_response_key"]) + assert session_token == "some session id"