From c87a74e04456e18a86b2ed689c5d2be3f53ed06a Mon Sep 17 00:00:00 2001 From: Antonio Boutaour Date: Thu, 18 Apr 2024 15:18:29 +0200 Subject: [PATCH 1/3] Added dag arg_key as specific_args_keys --- cosmos/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index fdf4d8e42..65296bbdb 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -57,7 +57,7 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]: new_kwargs = {} non_airflow_kwargs = specific_kwargs(**kwargs) for arg_key, arg_value in kwargs.items(): - if arg_key not in non_airflow_kwargs: + if arg_key not in non_airflow_kwargs or arg_key == 'dag': new_kwargs[arg_key] = arg_value return new_kwargs From aa55426a01fa23610094db4597c042f7cf62f7ca Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 18 Apr 2024 13:24:40 +0000 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 65296bbdb..f9511ab82 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -57,7 +57,7 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]: new_kwargs = {} non_airflow_kwargs = specific_kwargs(**kwargs) for arg_key, arg_value in kwargs.items(): - if arg_key not in non_airflow_kwargs or arg_key == 'dag': + if arg_key not in non_airflow_kwargs or arg_key == "dag": new_kwargs[arg_key] = arg_value return new_kwargs From 577f2736577a6e8982fedf821454fa3b2a017f5e Mon Sep 17 00:00:00 2001 From: Antonio Boutaour Date: Fri, 19 Apr 2024 10:27:31 +0200 Subject: [PATCH 3/3] Added unit test for airflow_kwargs --- tests/airflow/test_graph.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1e5648306..4ef7d112c 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -24,6 +24,7 @@ TestBehavior, TestIndirectSelection, ) +from cosmos.converter import airflow_kwargs from cosmos.dbt.graph import DbtNode from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -431,3 +432,28 @@ def test_create_test_task_metadata(node_type, node_unique_id, test_indirect_sele ) def test_snake_case_to_camelcase(input, expected): assert _snake_case_to_camelcase(input) == expected + + +def test_airflow_kwargs_generation(): + """ + airflow_kwargs_generation should always contain dag. + """ + task_args = { + "group_id": "fake_group_id", + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "render_config": RenderConfig(select=["fake-render"]), + "default_args": {"retries": 2}, + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + "dag": DAG(dag_id="fake_dag_name"), + } + result = airflow_kwargs(**task_args) + + assert "dag" in result