From cbde77628d11549e083320dd4d2f07cd1848cce8 Mon Sep 17 00:00:00 2001 From: Shahar Epstein Date: Fri, 17 May 2024 22:28:48 +0300 Subject: [PATCH] Introduce anonymous credentials in GCP base hook --- .../cloud/utils/credentials_provider.py | 73 +++++++++++-------- .../google/common/hooks/base_google.py | 16 ++-- .../cloud/utils/test_credentials_provider.py | 12 +-- .../google/common/hooks/test_base_google.py | 32 ++++++-- 4 files changed, 84 insertions(+), 49 deletions(-) diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py b/airflow/providers/google/cloud/utils/credentials_provider.py index 06f2b5c4d83ff..b3140c2da03f2 100644 --- a/airflow/providers/google/cloud/utils/credentials_provider.py +++ b/airflow/providers/google/cloud/utils/credentials_provider.py @@ -28,9 +28,9 @@ from urllib.parse import urlencode import google.auth -import google.auth.credentials import google.oauth2.service_account from google.auth import impersonated_credentials # type: ignore[attr-defined] +from google.auth.credentials import AnonymousCredentials, Credentials from google.auth.environment_vars import CREDENTIALS, LEGACY_PROJECT, PROJECT from airflow.exceptions import AirflowException @@ -178,6 +178,7 @@ class _CredentialProvider(LoggingMixin): :param key_secret_name: Keyfile Secret Name in GCP Secret Manager. :param key_secret_project_id: Project ID to read the secrets from. If not passed, the project ID from default credentials will be used. + :param credential_config_file: File path to or content of a GCP credential configuration file. :param scopes: OAuth scopes for the connection :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have @@ -192,6 +193,8 @@ class to configure Logger. Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account and target_principal granting the role to the last account from the list. + :param is_anonymous: Provides an anonymous set of credentials, + which is useful for APIs which do not require authentication. """ def __init__( @@ -206,13 +209,14 @@ def __init__( disable_logging: bool = False, target_principal: str | None = None, delegates: Sequence[str] | None = None, + is_anonymous: bool | None = None, ) -> None: super().__init__() - key_options = [key_path, key_secret_name, keyfile_dict] + key_options = [key_path, keyfile_dict, credential_config_file, key_secret_name, is_anonymous] if len([x for x in key_options if x]) > 1: raise AirflowException( - "The `keyfile_dict`, `key_path`, and `key_secret_name` fields " - "are all mutually exclusive. Please provide only one value." + "The `keyfile_dict`, `key_path`, `credential_config_file`, `is_anonymous` and" + " `key_secret_name` fields are all mutually exclusive. Please provide only one value." ) self.key_path = key_path self.keyfile_dict = keyfile_dict @@ -224,43 +228,48 @@ def __init__( self.disable_logging = disable_logging self.target_principal = target_principal self.delegates = delegates + self.is_anonymous = is_anonymous - def get_credentials_and_project(self) -> tuple[google.auth.credentials.Credentials, str]: + def get_credentials_and_project(self) -> tuple[Credentials, str]: """ Get current credentials and project ID. + Project ID is an empty string when using anonymous credentials. + :return: Google Auth Credentials """ - if self.key_path: - credentials, project_id = self._get_credentials_using_key_path() - elif self.key_secret_name: - credentials, project_id = self._get_credentials_using_key_secret_name() - elif self.keyfile_dict: - credentials, project_id = self._get_credentials_using_keyfile_dict() - elif self.credential_config_file: - credentials, project_id = self._get_credentials_using_credential_config_file() + if self.is_anonymous: + credentials, project_id = AnonymousCredentials(), "" else: - credentials, project_id = self._get_credentials_using_adc() - - if self.delegate_to: - if hasattr(credentials, "with_subject"): - credentials = credentials.with_subject(self.delegate_to) + if self.key_path: + credentials, project_id = self._get_credentials_using_key_path() + elif self.key_secret_name: + credentials, project_id = self._get_credentials_using_key_secret_name() + elif self.keyfile_dict: + credentials, project_id = self._get_credentials_using_keyfile_dict() + elif self.credential_config_file: + credentials, project_id = self._get_credentials_using_credential_config_file() else: - raise AirflowException( - "The `delegate_to` parameter cannot be used here as the current " - "authentication method does not support account impersonate. " - "Please use service-account for authorization." + credentials, project_id = self._get_credentials_using_adc() + if self.delegate_to: + if hasattr(credentials, "with_subject"): + credentials = credentials.with_subject(self.delegate_to) + else: + raise AirflowException( + "The `delegate_to` parameter cannot be used here as the current " + "authentication method does not support account impersonate. " + "Please use service-account for authorization." + ) + + if self.target_principal: + credentials = impersonated_credentials.Credentials( + source_credentials=credentials, + target_principal=self.target_principal, + delegates=self.delegates, + target_scopes=self.scopes, ) - if self.target_principal: - credentials = impersonated_credentials.Credentials( - source_credentials=credentials, - target_principal=self.target_principal, - delegates=self.delegates, - target_scopes=self.scopes, - ) - - project_id = _get_project_id_from_service_account_email(self.target_principal) + project_id = _get_project_id_from_service_account_email(self.target_principal) return credentials, project_id @@ -357,7 +366,7 @@ def _log_debug(self, *args, **kwargs) -> None: self.log.debug(*args, **kwargs) -def get_credentials_and_project_id(*args, **kwargs) -> tuple[google.auth.credentials.Credentials, str]: +def get_credentials_and_project_id(*args, **kwargs) -> tuple[Credentials, str]: """Return the Credentials object for Google API and the associated project_id.""" return _CredentialProvider(*args, **kwargs).get_credentials_and_project() diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index 5800f8e44ccb6..0de39330a95a4 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -30,7 +30,6 @@ from typing import TYPE_CHECKING, Any, Callable, Generator, Sequence, TypeVar, cast import google.auth -import google.auth.credentials import google.oauth2.service_account import google_auth_httplib2 import requests @@ -223,7 +222,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget from flask_babel import lazy_gettext - from wtforms import IntegerField, PasswordField, StringField + from wtforms import BooleanField, IntegerField, PasswordField, StringField from wtforms.validators import NumberRange return { @@ -249,6 +248,9 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: "impersonation_chain": StringField( lazy_gettext("Impersonation Chain"), widget=BS3TextFieldWidget() ), + "is_anonymous": BooleanField( + lazy_gettext("Anonymous credentials (ignores all other settings)"), default=False + ), } @classmethod @@ -270,10 +272,10 @@ def __init__( self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson - self._cached_credentials: google.auth.credentials.Credentials | None = None + self._cached_credentials: Credentials | None = None self._cached_project_id: str | None = None - def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Credentials, str | None]: + def get_credentials_and_project_id(self) -> tuple[Credentials, str | None]: """Return the Credentials object for Google API and the associated project_id.""" if self._cached_credentials is not None: return self._cached_credentials, self._cached_project_id @@ -301,6 +303,7 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden self.impersonation_chain = [s.strip() for s in self.impersonation_chain.split(",")] target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain) + is_anonymous = self._get_field("is_anonymous") credentials, project_id = get_credentials_and_project_id( key_path=key_path, @@ -312,6 +315,7 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden delegate_to=self.delegate_to, target_principal=target_principal, delegates=delegates, + is_anonymous=is_anonymous, ) overridden_project_id = self._get_field("project") @@ -323,7 +327,7 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden return credentials, project_id - def get_credentials(self) -> google.auth.credentials.Credentials: + def get_credentials(self) -> Credentials: """Return the Credentials object for Google API.""" credentials, _ = self.get_credentials_and_project_id() return credentials @@ -655,6 +659,8 @@ def download_content_from_request(file_handle, request: dict, chunk_size: int) - def test_connection(self): """Test the Google cloud connectivity from UI.""" status, message = False, "" + if self._get_field("is_anonymous"): + return True, "Credentials are anonymous" try: token = self._get_access_token() url = f"https://www.googleapis.com/oauth2/v3/tokeninfo?access_token={token}" diff --git a/tests/providers/google/cloud/utils/test_credentials_provider.py b/tests/providers/google/cloud/utils/test_credentials_provider.py index 812d9c5cd56f9..2fdf756e2803d 100644 --- a/tests/providers/google/cloud/utils/test_credentials_provider.py +++ b/tests/providers/google/cloud/utils/test_credentials_provider.py @@ -375,14 +375,14 @@ def test_get_credentials_and_project_id_with_key_secret_name_when_key_is_invalid get_credentials_and_project_id(key_secret_name="secret name") def test_get_credentials_and_project_id_with_mutually_exclusive_configuration(self): - with pytest.raises( - AirflowException, - match=re.escape( - "The `keyfile_dict`, `key_path`, and `key_secret_name` fields are all mutually exclusive." - ), - ): + with pytest.raises(AirflowException, match="mutually exclusive."): get_credentials_and_project_id(key_path="KEY.json", keyfile_dict={"private_key": "PRIVATE_KEY"}) + @mock.patch("airflow.providers.google.cloud.utils.credentials_provider.AnonymousCredentials") + def test_get_credentials_using_anonymous_credentials(self, mock_anonymous_credentials): + result = get_credentials_and_project_id(is_anonymous=True) + assert result == (mock_anonymous_credentials.return_value, "") + @mock.patch("google.auth.default", return_value=("CREDENTIALS", "PROJECT_ID")) @mock.patch( "google.oauth2.service_account.Credentials.from_service_account_info", diff --git a/tests/providers/google/common/hooks/test_base_google.py b/tests/providers/google/common/hooks/test_base_google.py index 1a91f0742d510..50874c1c3638d 100644 --- a/tests/providers/google/common/hooks/test_base_google.py +++ b/tests/providers/google/common/hooks/test_base_google.py @@ -412,6 +412,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_get_creds_a delegate_to=None, target_principal=None, delegates=None, + is_anonymous=None, ) assert ("CREDENTIALS", "PROJECT_ID") == result @@ -449,6 +450,7 @@ def test_get_credentials_and_project_id_with_service_account_file(self, mock_get delegate_to=None, target_principal=None, delegates=None, + is_anonymous=None, ) assert (mock_credentials, "PROJECT_ID") == result @@ -479,6 +481,7 @@ def test_get_credentials_and_project_id_with_service_account_info(self, mock_get delegate_to=None, target_principal=None, delegates=None, + is_anonymous=None, ) assert (mock_credentials, "PROJECT_ID") == result @@ -499,6 +502,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_delegate(self, moc delegate_to="USER", target_principal=None, delegates=None, + is_anonymous=None, ) assert (mock_credentials, "PROJECT_ID") == result @@ -535,6 +539,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_overridden_project delegate_to=None, target_principal=None, delegates=None, + is_anonymous=None, ) assert ("CREDENTIALS", "SECOND_PROJECT_ID") == result @@ -544,12 +549,7 @@ def test_get_credentials_and_project_id_with_mutually_exclusive_configuration(se "key_path": "KEY_PATH", "keyfile_dict": '{"KEY": "VALUE"}', } - with pytest.raises( - AirflowException, - match=re.escape( - "The `keyfile_dict`, `key_path`, and `key_secret_name` fields are all mutually exclusive. " - ), - ): + with pytest.raises(AirflowException, match="mutually exclusive"): self.instance.get_credentials_and_project_id() def test_get_credentials_and_project_id_with_invalid_keyfile_dict(self): @@ -559,6 +559,25 @@ def test_get_credentials_and_project_id_with_invalid_keyfile_dict(self): with pytest.raises(AirflowException, match=re.escape("Invalid key JSON.")): self.instance.get_credentials_and_project_id() + @mock.patch(MODULE_NAME + ".get_credentials_and_project_id", return_value=("CREDENTIALS", "")) + def test_get_credentials_and_project_id_with_is_anonymous(self, mock_get_creds_and_proj_id): + self.instance.extras = { + "is_anonymous": True, + } + self.instance.get_credentials_and_project_id() + mock_get_creds_and_proj_id.assert_called_once_with( + key_path=None, + keyfile_dict=None, + credential_config_file=None, + key_secret_name=None, + key_secret_project_id=None, + scopes=self.instance.scopes, + delegate_to=None, + target_principal=None, + delegates=None, + is_anonymous=True, + ) + @pytest.mark.skipif( not default_creds_available, reason="Default Google Cloud credentials not available to run tests" ) @@ -764,6 +783,7 @@ def test_get_credentials_and_project_id_with_impersonation_chain( delegate_to=None, target_principal=target_principal, delegates=delegates, + is_anonymous=None, ) assert (mock_credentials, PROJECT_ID) == result