diff --git a/cosmos/converter.py b/cosmos/converter.py index dbc290271..45d98a4cf 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -3,6 +3,7 @@ from __future__ import annotations +import copy import inspect from typing import Any, Callable @@ -118,6 +119,10 @@ def __init__( # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: + # We copy the configuration so the change does not affect other DAGs or TaskGroups + # that may reuse the same original configuration + render_config = copy.deepcopy(render_config) + execution_config = copy.deepcopy(execution_config) render_config.project_path = project_config.dbt_project_path execution_config.project_path = project_config.dbt_project_path diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 50cb6ed09..731914953 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -2,13 +2,14 @@ An example DAG that uses Cosmos to render a dbt project as a TaskGroup. """ import os + from datetime import datetime from pathlib import Path from airflow.decorators import dag from airflow.operators.empty import EmptyOperator -from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig +from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -23,6 +24,8 @@ ), ) +shared_execution_config = ExecutionConfig() + @dag( schedule_interval="@daily", @@ -35,11 +38,25 @@ def basic_cosmos_task_group() -> None: """ pre_dbt = EmptyOperator(task_id="pre_dbt") - jaffle_shop = DbtTaskGroup( - group_id="test_123", + customers = DbtTaskGroup( + group_id="customers", + project_config=ProjectConfig( + (DBT_ROOT_PATH / "jaffle_shop").as_posix(), + ), + render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]), + execution_config=shared_execution_config, + operator_args={"install_deps": True}, + profile_config=profile_config, + default_args={"retries": 2}, + ) + + orders = DbtTaskGroup( + group_id="orders", project_config=ProjectConfig( (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), + render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]), + execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, default_args={"retries": 2}, @@ -47,7 +64,8 @@ def basic_cosmos_task_group() -> None: post_dbt = EmptyOperator(task_id="post_dbt") - pre_dbt >> jaffle_shop >> post_dbt + pre_dbt >> customers >> post_dbt + pre_dbt >> orders >> post_dbt basic_cosmos_task_group()