diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index b1dcb8747..caf992b78 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -172,7 +172,6 @@ def build_airflow_graph( and test_behavior == TestBehavior.AFTER_EACH and node.has_test is True ) - task_meta = create_task_metadata( node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group ) diff --git a/cosmos/converter.py b/cosmos/converter.py index 05b523a1d..c97d9274e 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -154,7 +154,6 @@ def __init__( "profile_config": profile_config, "emit_datasets": emit_datasets, } - if dbt_executable_path: task_args["dbt_executable_path"] = dbt_executable_path diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b61dbd5fb..d43a2d241 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -33,6 +33,7 @@ class DbtBaseOperator(BaseOperator): :param cache_selected_only: :param no_version_check: dbt optional argument - If set, skip ensuring dbt's version matches the one specified in the dbt_project.yml file ('require-dbt-version') + :param emit_datasets: Enable emitting inlets and outlets during task execution :param fail_fast: dbt optional argument to make dbt exit immediately if a single resource fails to build. :param quiet: dbt optional argument to show only error logs in stdout :param warn_error: dbt optional argument to convert dbt warnings into errors @@ -87,6 +88,7 @@ def __init__( selector: str | None = None, vars: dict[str, str] | None = None, models: str | None = None, + emit_datasets: bool = True, cache_selected_only: bool = False, no_version_check: bool = False, fail_fast: bool = False, @@ -112,6 +114,7 @@ def __init__( self.selector = selector self.vars = vars self.models = models + self.emit_datasets = emit_datasets self.cache_selected_only = cache_selected_only self.no_version_check = no_version_check self.fail_fast = fail_fast diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index b55dcdbe3..996bbc9dd 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -75,6 +75,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) if self.profile_config.target_name: dbt_cmd.extend(["--target", self.profile_config.target_name]) + if self.project_dir: + dbt_cmd.extend(["--project-dir", str(self.project_dir)]) + # set env vars self.build_env_args(env_vars) self.arguments = dbt_cmd diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4888583bb..aaad4e259 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -82,6 +82,7 @@ class DbtLocalBaseOperator(DbtBaseOperator): :param profile_name: A name to use for the dbt profile. If not provided, and no profile target is found in your project's dbt_project.yml, "cosmos_profile" is used. :param install_deps: If true, install dependencies before running the command + :param install_deps: If true, the operator will set inlets and outlets :param callback: A callback function called on after a dbt run with a path to the dbt project directory. :param target_name: A name to use for the dbt target. If not provided, and no target is found in your project's dbt_project.yml, "cosmos_target" is used. @@ -99,7 +100,6 @@ def __init__( install_deps: bool = False, callback: Callable[[str], None] | None = None, should_store_compiled_sql: bool = True, - emit_datasets: bool = True, **kwargs: Any, ) -> None: self.profile_config = profile_config @@ -107,7 +107,6 @@ def __init__( self.callback = callback self.compiled_sql = "" self.should_store_compiled_sql = should_store_compiled_sql - self.emit_datasets = emit_datasets self.openlineage_events_completes: list[RunEvent] = [] super().__init__(**kwargs) diff --git a/docs/_static/jaffle_shop_k8s_dag_run.png b/docs/_static/jaffle_shop_k8s_dag_run.png index f2ee1a5e2..e41db8b81 100644 Binary files a/docs/_static/jaffle_shop_k8s_dag_run.png and b/docs/_static/jaffle_shop_k8s_dag_run.png differ diff --git a/docs/getting_started/kubernetes.rst b/docs/getting_started/kubernetes.rst index 9a500a047..6d2368997 100644 --- a/docs/getting_started/kubernetes.rst +++ b/docs/getting_started/kubernetes.rst @@ -30,20 +30,27 @@ For instance, .. code-block:: text - DbtTaskGroup( - ... + run_models = DbtTaskGroup( + profile_config=ProfileConfig( + profile_name="postgres_profile", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_default", + profile_args={ + "schema": "public", + }, + ), + ), + project_config=ProjectConfig(PROJECT_DIR), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.KUBERNETES, + ), operator_args={ - "queue": "kubernetes", - "image": "dbt-jaffle-shop:1.0.0", - "image_pull_policy": "Always", + "image": DBT_IMAGE, "get_logs": True, "is_delete_operator_pod": False, - "namespace": "default", - "env_vars": { - ... - }, + "secrets": [postgres_password_secret, postgres_host_secret], }, - execution_mode="kubernetes", ) Step-by-step instructions @@ -53,7 +60,7 @@ Using installed `Kind `_, you can setup a local kuber .. code-block:: bash - kind cluster create + kind create cluster Deploy a Postgres pod to Kind using `Helm `_ diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 67d3ff137..839a6db54 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -98,6 +98,8 @@ def test_dbt_kubernetes_build_command(): "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", + "--project-dir", + "my/dir", ] @@ -150,6 +152,8 @@ def test_created_pod(test_hook): "data_interval_start.strftime(''%Y%m%d%H%M%S'') " "}}'\n", "--no-version-check", + "--project-dir", + "my/dir", ], "command": [], "env": [{"name": "FOO", "value": "BAR", "value_from": None}], diff --git a/tests/sample/profiles.yml b/tests/sample/profiles.yml new file mode 100644 index 000000000..359c1e6eb --- /dev/null +++ b/tests/sample/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "localhost" + user: "postgres" + password: "postgres" + port: 5432 + dbname: "postgres" + schema: "public" + threads: 4 diff --git a/tests/test_converter.py b/tests/test_converter.py index 99d8c32ac..0d321730a 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,8 +1,17 @@ +from pathlib import Path + +from unittest.mock import patch import pytest + +from cosmos.converter import DbtToAirflowConverter, validate_arguments +from cosmos.constants import DbtResourceType, ExecutionMode +from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig +from cosmos.dbt.graph import DbtNode from cosmos.exceptions import CosmosValueError -from cosmos.converter import validate_arguments +SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" +SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -16,3 +25,45 @@ def test_validate_arguments_tags(argument_key): validate_arguments(select, exclude, profile_args, task_args) expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}" assert err.value.args[0] == expected + + +parent_seed = DbtNode( + name="seed_parent", + unique_id="seed_parent", + resource_type=DbtResourceType.SEED, + depends_on=[], + file_path="", +) +nodes = {"seed_parent": parent_seed} + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors. + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert converter