From fa9041fa77ec48b716cb93d4f5ee151e15cd4ea2 Mon Sep 17 00:00:00 2001 From: Justin Bandoro Date: Tue, 15 Aug 2023 16:39:11 -0700 Subject: [PATCH 1/3] mask bigquery keyfile_dict private keys as en vars --- .../bigquery/service_account_keyfile_dict.py | 32 +++++++++++++++---- .../test_bq_service_account_keyfile_dict.py | 30 +++++++++++++++-- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py index d459373be..494bdf022 100644 --- a/cosmos/profiles/bigquery/service_account_keyfile_dict.py +++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py @@ -3,6 +3,7 @@ from typing import Any import json +from cosmos.exceptions import CosmosValueError from cosmos.profiles.base import BaseProfileMapping @@ -23,6 +24,8 @@ class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping): "keyfile_json", ] + secret_fields = ["private_key_id", "private_key"] + airflow_param_mapping = { "project": "extra.project", # multiple options for dataset because of older Airflow versions @@ -31,6 +34,8 @@ class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping): "keyfile_json": ["extra.keyfile_dict", "keyfile_dict", "extra__google_cloud_platform__keyfile_dict"], } + _env_vars: dict[str, str] = {} + @property def profile(self) -> dict[str, Any | None]: """ @@ -48,12 +53,27 @@ def profile(self) -> dict[str, Any | None]: def transform_keyfile_json(self, keyfile_json: str | dict[str, str]) -> dict[str, str]: """ - Transforms the keyfile_json param to a dict if it is a string. + Transforms the keyfile_json param to a dict if it is a string, and + sets environment variables for the service account credentials. """ if isinstance(keyfile_json, dict): - return keyfile_json - keyfile_json = json.loads(keyfile_json) - if isinstance(keyfile_json, dict): - return keyfile_json + keyfile_json_dict = keyfile_json else: - raise ValueError("keyfile_json cannot be loaded as a dict.") + keyfile_json_dict = json.loads(keyfile_json) + if not isinstance(keyfile_json_dict, dict): + raise ValueError("keyfile_json cannot be loaded as a dict.") + + for field in self.secret_fields: + value = keyfile_json_dict.get(field) + if value is None: + raise CosmosValueError(f"Could not find a value in service account json field: {field}.") + env_var_name = self.get_env_var_name(field) + self._env_vars[env_var_name] = value + keyfile_json_dict[field] = self.get_env_var_format(field) + + return keyfile_json_dict + + @property + def env_vars(self) -> dict[str, str]: + "Returns a dictionary of environment variables that should be set based on self.secret_fields." + return self._env_vars diff --git a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py index 9bea15ff7..926fcf8ce 100644 --- a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py +++ b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py @@ -7,8 +7,14 @@ from cosmos.profiles import get_automatic_profile_mapping from cosmos.profiles.bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping +sample_keyfile_dict = { + "type": "service_account", + "private_key_id": "my_private_key_id", + "private_key": "my_private_key", +} -@pytest.fixture(params=[{"key": "value"}, '{"key": "value"}']) + +@pytest.fixture(params=[sample_keyfile_dict, json.dumps(sample_keyfile_dict)]) def mock_bigquery_conn_with_dict(request): # type: ignore """ Mocks and returns an Airflow BigQuery connection. @@ -43,7 +49,7 @@ def test_connection_claiming_succeeds(mock_bigquery_conn_with_dict: Connection): def test_connection_claiming_fails(mock_bigquery_conn_with_dict: Connection): # Remove the `dataset` key, which is mandatory - mock_bigquery_conn_with_dict.extra = json.dumps({"project": "my_project", "keyfile_dict": {"key": "value"}}) + mock_bigquery_conn_with_dict.extra = json.dumps({"project": "my_project", "keyfile_dict": sample_keyfile_dict}) profile_mapping = GoogleCloudServiceAccountDictProfileMapping(mock_bigquery_conn_with_dict, {}) assert not profile_mapping.can_claim_connection() @@ -56,6 +62,24 @@ def test_profile(mock_bigquery_conn_with_dict: Connection): "project": "my_project", "dataset": "my_dataset", "threads": 1, - "keyfile_json": {"key": "value"}, + "keyfile_json": { + "type": "service_account", + "private_key_id": "{{ env_var('COSMOS_CONN_GOOGLE_CLOUD_PLATFORM_PRIVATE_KEY_ID') }}", + "private_key": "{{ env_var('COSMOS_CONN_GOOGLE_CLOUD_PLATFORM_PRIVATE_KEY') }}", + }, } assert profile_mapping.profile == expected + + +def test_profile_env_vars(mock_bigquery_conn_with_dict: Connection): + """ + Tests that the environment variables get set correctly. + """ + profile_mapping = get_automatic_profile_mapping( + mock_bigquery_conn_with_dict.conn_id, + {"dataset": "my_dataset"}, + ) + assert profile_mapping.env_vars == { + "COSMOS_CONN_GOOGLE_CLOUD_PLATFORM_PRIVATE_KEY_ID": "my_private_key_id", + "COSMOS_CONN_GOOGLE_CLOUD_PLATFORM_PRIVATE_KEY": "my_private_key", + } From 05fb91ff02ddfc4a7087a6c8d0b594aad94cbfd6 Mon Sep 17 00:00:00 2001 From: Justin Bandoro Date: Tue, 15 Aug 2023 17:15:18 -0700 Subject: [PATCH 2/3] docs: update transform_keyfile_json method --- cosmos/profiles/bigquery/service_account_keyfile_dict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py index 494bdf022..a909d3719 100644 --- a/cosmos/profiles/bigquery/service_account_keyfile_dict.py +++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py @@ -53,8 +53,8 @@ def profile(self) -> dict[str, Any | None]: def transform_keyfile_json(self, keyfile_json: str | dict[str, str]) -> dict[str, str]: """ - Transforms the keyfile_json param to a dict if it is a string, and - sets environment variables for the service account credentials. + Transforms the keyfile_json param to a dict if it is a string, and sets environment + variables for the service account json secret fields. """ if isinstance(keyfile_json, dict): keyfile_json_dict = keyfile_json From b6b957a519661045f1ce9a11f56c8694b571cc04 Mon Sep 17 00:00:00 2001 From: Justin Bandoro Date: Wed, 16 Aug 2023 10:47:16 -0700 Subject: [PATCH 3/3] add test coverage for CosmosValueError raising in transform_keyfile_json --- .../bigquery/service_account_keyfile_dict.py | 2 +- .../test_bq_service_account_keyfile_dict.py | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py index a909d3719..bc7943e8d 100644 --- a/cosmos/profiles/bigquery/service_account_keyfile_dict.py +++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py @@ -61,7 +61,7 @@ def transform_keyfile_json(self, keyfile_json: str | dict[str, str]) -> dict[str else: keyfile_json_dict = json.loads(keyfile_json) if not isinstance(keyfile_json_dict, dict): - raise ValueError("keyfile_json cannot be loaded as a dict.") + raise CosmosValueError("keyfile_json cannot be loaded as a dict.") for field in self.secret_fields: value = keyfile_json_dict.get(field) diff --git a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py index 926fcf8ce..8a260ec2f 100644 --- a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py +++ b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py @@ -3,6 +3,7 @@ import pytest from airflow.models.connection import Connection +from cosmos.exceptions import CosmosValueError from cosmos.profiles import get_automatic_profile_mapping from cosmos.profiles.bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping @@ -83,3 +84,29 @@ def test_profile_env_vars(mock_bigquery_conn_with_dict: Connection): "COSMOS_CONN_GOOGLE_CLOUD_PLATFORM_PRIVATE_KEY_ID": "my_private_key_id", "COSMOS_CONN_GOOGLE_CLOUD_PLATFORM_PRIVATE_KEY": "my_private_key", } + + +def test_transform_keyfile_json_missing_dict(): + """ + Tests that a cosmos error is raised if the keyfile_json cannot be loaded as a dict. + """ + keyfile_json = '["value"]' + expected_cosmos_error = "keyfile_json cannot be loaded as a dict." + + profile_mapping = GoogleCloudServiceAccountDictProfileMapping("", {}) + with pytest.raises(CosmosValueError, match=expected_cosmos_error): + profile_mapping.transform_keyfile_json(keyfile_json) + + +@pytest.mark.parametrize("missing_secret_key", ["private_key_id", "private_key"]) +def test_transform_keyfile_json_missing_secret_key(missing_secret_key: str): + """ + Tests that a cosmos error is raised if the keyfile_json is missing a secret key. + """ + keyfile_json = {k: v for k, v in sample_keyfile_dict.items() if k != missing_secret_key} + expected_cosmos_error = f"Could not find a value in service account json field: {missing_secret_key}." + + profile_mapping = GoogleCloudServiceAccountDictProfileMapping("", {}) + + with pytest.raises(CosmosValueError, match=expected_cosmos_error): + profile_mapping.transform_keyfile_json(keyfile_json)