From 9ef51f6b581d004db4f0697e57700875200767cc Mon Sep 17 00:00:00 2001 From: Monideep De Date: Tue, 1 Aug 2023 12:29:58 +0100 Subject: [PATCH 1/6] Support OAuth authentication for Biq Query --- cosmos/profiles/bigquery/__init__.py | 2 ++ .../bigquery/service_account_oauth.py | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 cosmos/profiles/bigquery/service_account_oauth.py diff --git a/cosmos/profiles/bigquery/__init__.py b/cosmos/profiles/bigquery/__init__.py index e322c3af5..5b608c9f7 100644 --- a/cosmos/profiles/bigquery/__init__.py +++ b/cosmos/profiles/bigquery/__init__.py @@ -2,8 +2,10 @@ from .service_account_file import GoogleCloudServiceAccountFileProfileMapping from .service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping +from .service_account_oauth import GoogleCloudServiceAccountOauth __all__ = [ "GoogleCloudServiceAccountFileProfileMapping", "GoogleCloudServiceAccountDictProfileMapping", + "GoogleCloudServiceAccountOauth", ] diff --git a/cosmos/profiles/bigquery/service_account_oauth.py b/cosmos/profiles/bigquery/service_account_oauth.py new file mode 100644 index 000000000..ffc00f474 --- /dev/null +++ b/cosmos/profiles/bigquery/service_account_oauth.py @@ -0,0 +1,36 @@ +"Maps Airflow GCP connections to dbt BigQuery profiles that uses oauth via gcloud, if they don't use key file or JSON." +from typing import Any + +from cosmos.profiles.base import BaseProfileMapping + + +class GoogleCloudServiceAccountOauth(BaseProfileMapping): + """ + Maps Airflow GCP connections to dbt BigQuery profiles that uses oauth via gcloud, if they don't use key file or JSON. + + https://docs.getdbt.com/docs/core/connect-data-platform/bigquery-setup#oauth-via-gcloud + https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html + """ + + airflow_connection_type: str = "google_cloud_platform" + + required_fields = [ + "project", + "dataset", + ] + + airflow_param_mapping = { + "project": "extra.project", + "dataset": "extra.dataset", + } + + @property + def profile(self) -> dict[str, Any | None]: + "Generates profile. Defaults `threads` to 1." + return { + **self.mapped_params, + "type": "bigquery", + "method": "oauth", + "threads": 1, + **self.profile_args, + } \ No newline at end of file From 53fb72c9591cade1227211773499f0a3d52b2793 Mon Sep 17 00:00:00 2001 From: Monideep De Date: Tue, 1 Aug 2023 17:33:58 +0100 Subject: [PATCH 2/6] Added tests --- cosmos/profiles/__init__.py | 3 + cosmos/profiles/base.py | 2 +- cosmos/profiles/bigquery/__init__.py | 4 +- .../{service_account_oauth.py => oauth.py} | 2 +- tests/profiles/bigquery/test_bq_oauth.py | 66 +++++++++++++++++++ 5 files changed, 73 insertions(+), 4 deletions(-) rename cosmos/profiles/bigquery/{service_account_oauth.py => oauth.py} (94%) create mode 100644 tests/profiles/bigquery/test_bq_oauth.py diff --git a/cosmos/profiles/__init__.py b/cosmos/profiles/__init__.py index 9a5d5ffdf..752a84ff3 100644 --- a/cosmos/profiles/__init__.py +++ b/cosmos/profiles/__init__.py @@ -8,6 +8,7 @@ from .base import BaseProfileMapping from .bigquery.service_account_file import GoogleCloudServiceAccountFileProfileMapping from .bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping +from .bigquery.oauth import GoogleCloudOauthProfileMapping from .databricks.token import DatabricksTokenProfileMapping from .exasol.user_pass import ExasolUserPasswordProfileMapping from .postgres.user_pass import PostgresUserPasswordProfileMapping @@ -22,6 +23,7 @@ profile_mappings: list[Type[BaseProfileMapping]] = [ GoogleCloudServiceAccountFileProfileMapping, GoogleCloudServiceAccountDictProfileMapping, + GoogleCloudOauthProfileMapping, DatabricksTokenProfileMapping, PostgresUserPasswordProfileMapping, RedshiftUserPasswordProfileMapping, @@ -57,6 +59,7 @@ def get_automatic_profile_mapping( "BaseProfileMapping", "GoogleCloudServiceAccountFileProfileMapping", "GoogleCloudServiceAccountDictProfileMapping", + "GoogleCloudOauthProfileMapping", "DatabricksTokenProfileMapping", "PostgresUserPasswordProfileMapping", "RedshiftUserPasswordProfileMapping", diff --git a/cosmos/profiles/base.py b/cosmos/profiles/base.py index 2db741132..004be0529 100644 --- a/cosmos/profiles/base.py +++ b/cosmos/profiles/base.py @@ -147,7 +147,7 @@ def get_dbt_value(self, name: str) -> Any: airflow_field = airflow_field.replace("extra.", "", 1) value = self.conn.extra_dejson.get(airflow_field) else: - value = getattr(self.conn, airflow_field) + value = getattr(self.conn, airflow_field, None) if not value: continue diff --git a/cosmos/profiles/bigquery/__init__.py b/cosmos/profiles/bigquery/__init__.py index 5b608c9f7..c03f9a0ab 100644 --- a/cosmos/profiles/bigquery/__init__.py +++ b/cosmos/profiles/bigquery/__init__.py @@ -2,10 +2,10 @@ from .service_account_file import GoogleCloudServiceAccountFileProfileMapping from .service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping -from .service_account_oauth import GoogleCloudServiceAccountOauth +from .oauth import GoogleCloudOauthProfileMapping __all__ = [ "GoogleCloudServiceAccountFileProfileMapping", "GoogleCloudServiceAccountDictProfileMapping", - "GoogleCloudServiceAccountOauth", + "GoogleCloudOauthProfileMapping", ] diff --git a/cosmos/profiles/bigquery/service_account_oauth.py b/cosmos/profiles/bigquery/oauth.py similarity index 94% rename from cosmos/profiles/bigquery/service_account_oauth.py rename to cosmos/profiles/bigquery/oauth.py index ffc00f474..6db6c754e 100644 --- a/cosmos/profiles/bigquery/service_account_oauth.py +++ b/cosmos/profiles/bigquery/oauth.py @@ -4,7 +4,7 @@ from cosmos.profiles.base import BaseProfileMapping -class GoogleCloudServiceAccountOauth(BaseProfileMapping): +class GoogleCloudOauthProfileMapping(BaseProfileMapping): """ Maps Airflow GCP connections to dbt BigQuery profiles that uses oauth via gcloud, if they don't use key file or JSON. diff --git a/tests/profiles/bigquery/test_bq_oauth.py b/tests/profiles/bigquery/test_bq_oauth.py new file mode 100644 index 000000000..f5d636670 --- /dev/null +++ b/tests/profiles/bigquery/test_bq_oauth.py @@ -0,0 +1,66 @@ +"Tests for the BigQuery profile." + +import json +from unittest.mock import patch + +import pytest +from airflow.models.connection import Connection + +from cosmos.profiles import get_automatic_profile_mapping +from cosmos.profiles.bigquery.oauth import ( + GoogleCloudOauthProfileMapping, +) +from typing import Dict + + +@pytest.fixture() +def mock_bigquery_conn(request): + """ + Mocks and returns an Airflow BigQuery connection. + """ + extra = {"project": "my_project", "dataset": "my_dataset"} if not hasattr(request, "param") else request.param + conn = Connection( + conn_id="my_bigquery_connection", + conn_type="google_cloud_platform", + extra=json.dumps(extra), + ) + + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + yield conn + + +def test_bigquery_mapping_selected(mock_bigquery_conn: Connection): + profile_mapping = get_automatic_profile_mapping( + mock_bigquery_conn.conn_id, {} + ) + assert isinstance(profile_mapping, GoogleCloudOauthProfileMapping) + + +@pytest.mark.parametrize("mock_bigquery_conn", [ + {"project": "my_project"}, + {"dataset": "my_dataset"}, + {} +], indirect=True) +def test_connection_claiming_fails(mock_bigquery_conn: Connection) -> None: + """ + Tests that the BigQuery profile mapping claims the correct connection type. + """ + profile_mapping = GoogleCloudOauthProfileMapping(mock_bigquery_conn) + assert not profile_mapping.can_claim_connection() + + +def test_connection_claiming_succeeds(mock_bigquery_conn: Connection): + profile_mapping = GoogleCloudOauthProfileMapping(mock_bigquery_conn, {}) + assert profile_mapping.can_claim_connection() + + +def test_profile(mock_bigquery_conn: Connection): + profile_mapping = GoogleCloudOauthProfileMapping(mock_bigquery_conn, {}) + expected = { + "type": "bigquery", + "method": "oauth", + "project": "my_project", + "dataset": "my_dataset", + "threads": 1, + } + assert profile_mapping.profile == expected From 53ec3a7bda75ee4ec60f7d339f17410183fe39f3 Mon Sep 17 00:00:00 2001 From: Monideep De Date: Tue, 1 Aug 2023 20:41:43 +0100 Subject: [PATCH 3/6] Correcting comment --- cosmos/profiles/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/profiles/base.py b/cosmos/profiles/base.py index 004be0529..58780b5f7 100644 --- a/cosmos/profiles/base.py +++ b/cosmos/profiles/base.py @@ -134,7 +134,7 @@ def get_dbt_value(self, name: str) -> Any: if self.profile_args.get(name): return self.profile_args[name] - # if it's has an entry in airflow_param_mapping, we can get it from conn + # if it has an entry in airflow_param_mapping, we can get it from conn if name in self.airflow_param_mapping: airflow_fields = self.airflow_param_mapping[name] From 19f6e5a2620925f6c85b1ecdb6e3c00bb3491a06 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 1 Aug 2023 23:36:09 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/profiles/bigquery/oauth.py | 2 +- tests/profiles/bigquery/test_bq_oauth.py | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/cosmos/profiles/bigquery/oauth.py b/cosmos/profiles/bigquery/oauth.py index 6db6c754e..d1c1c200d 100644 --- a/cosmos/profiles/bigquery/oauth.py +++ b/cosmos/profiles/bigquery/oauth.py @@ -33,4 +33,4 @@ def profile(self) -> dict[str, Any | None]: "method": "oauth", "threads": 1, **self.profile_args, - } \ No newline at end of file + } diff --git a/tests/profiles/bigquery/test_bq_oauth.py b/tests/profiles/bigquery/test_bq_oauth.py index f5d636670..f225f585f 100644 --- a/tests/profiles/bigquery/test_bq_oauth.py +++ b/tests/profiles/bigquery/test_bq_oauth.py @@ -10,7 +10,6 @@ from cosmos.profiles.bigquery.oauth import ( GoogleCloudOauthProfileMapping, ) -from typing import Dict @pytest.fixture() @@ -30,17 +29,13 @@ def mock_bigquery_conn(request): def test_bigquery_mapping_selected(mock_bigquery_conn: Connection): - profile_mapping = get_automatic_profile_mapping( - mock_bigquery_conn.conn_id, {} - ) + profile_mapping = get_automatic_profile_mapping(mock_bigquery_conn.conn_id, {}) assert isinstance(profile_mapping, GoogleCloudOauthProfileMapping) -@pytest.mark.parametrize("mock_bigquery_conn", [ - {"project": "my_project"}, - {"dataset": "my_dataset"}, - {} -], indirect=True) +@pytest.mark.parametrize( + "mock_bigquery_conn", [{"project": "my_project"}, {"dataset": "my_dataset"}, {}], indirect=True +) def test_connection_claiming_fails(mock_bigquery_conn: Connection) -> None: """ Tests that the BigQuery profile mapping claims the correct connection type. From b9c19077198613fb12288cbb4c9ccc4fc63d2aeb Mon Sep 17 00:00:00 2001 From: Monideep De Date: Wed, 2 Aug 2023 10:26:20 +0100 Subject: [PATCH 5/6] Import annotations --- cosmos/profiles/bigquery/oauth.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/profiles/bigquery/oauth.py b/cosmos/profiles/bigquery/oauth.py index d1c1c200d..1bd0e7557 100644 --- a/cosmos/profiles/bigquery/oauth.py +++ b/cosmos/profiles/bigquery/oauth.py @@ -1,4 +1,6 @@ "Maps Airflow GCP connections to dbt BigQuery profiles that uses oauth via gcloud, if they don't use key file or JSON." +from __future__ import annotations + from typing import Any from cosmos.profiles.base import BaseProfileMapping From 51c80b982abe39cacef5986e76210721a95759c8 Mon Sep 17 00:00:00 2001 From: Monideep De Date: Wed, 2 Aug 2023 16:33:27 +0100 Subject: [PATCH 6/6] Fix for pre-commit hook failure --- cosmos/profiles/bigquery/oauth.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/profiles/bigquery/oauth.py b/cosmos/profiles/bigquery/oauth.py index 1bd0e7557..b23b0e665 100644 --- a/cosmos/profiles/bigquery/oauth.py +++ b/cosmos/profiles/bigquery/oauth.py @@ -8,7 +8,8 @@ class GoogleCloudOauthProfileMapping(BaseProfileMapping): """ - Maps Airflow GCP connections to dbt BigQuery profiles that uses oauth via gcloud, if they don't use key file or JSON. + Maps Airflow GCP connections to dbt BigQuery profiles that uses oauth via gcloud, + if they don't use key file or JSON. https://docs.getdbt.com/docs/core/connect-data-platform/bigquery-setup#oauth-via-gcloud https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html