From b9fdc926be812c3c4b41f981e50ae5401046490f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 16 Aug 2023 10:35:36 +0100 Subject: [PATCH 1/3] Support dbt global flags (via `dbt_cmd_global_flags` in `operator_args`) (#469) Allow users to define `dbt_cmd_global_flags` inside the `operator_args` dictionary to set global flags such as `--cache-selected-only`. These flags will be added between the dbt binary and the dbt subcommand being run. This PR is not a breaking change since it introduces a new option/parameter. An alternative implementation would be to hard-code the current 48 dbt (1.6) global flags (that are listed when we run `dbt --help` into Cosmos source code, deciding from within Cosmos which flags given to `dbt_cmd_flags` are global or which are not. This alternative implementation would represent more maintainability to Cosmos developers and is less future-proof to newer versions of dbt. Closes: #413 --- cosmos/operators/base.py | 5 +++++ tests/operators/test_local.py | 19 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b22f0115a..a1925bdc5 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -55,6 +55,7 @@ class DbtBaseOperator(BaseOperator): :param dbt_executable_path: Path to dbt executable can be used with venv (i.e. /home/astro/.pyenv/versions/dbt_venv/bin/dbt) :param dbt_cmd_flags: List of flags to pass to dbt command + :param dbt_cmd_global_flags: List of dbt global flags to be passed to the dbt command """ template_fields: Sequence[str] = ("env", "vars") @@ -100,6 +101,7 @@ def __init__( cancel_query_on_kill: bool = True, dbt_executable_path: str = "dbt", dbt_cmd_flags: list[str] | None = None, + dbt_cmd_global_flags: list[str] | None = None, **kwargs: Any, ) -> None: self.project_dir = project_dir @@ -132,6 +134,7 @@ def __init__( else: self.dbt_executable_path = dbt_executable_path self.dbt_cmd_flags = dbt_cmd_flags + self.dbt_cmd_global_flags = dbt_cmd_global_flags or [] super().__init__(**kwargs) def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: @@ -210,6 +213,8 @@ def build_cmd( ) -> Tuple[list[str | None], dict[str, str | bytes | os.PathLike[Any]]]: dbt_cmd = [self.dbt_executable_path] + dbt_cmd.extend(self.dbt_cmd_global_flags) + if self.base_cmd: dbt_cmd.extend(self.base_cmd) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 27cfc9f73..9ad2ac432 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -48,7 +48,24 @@ def test_dbt_base_operator_add_user_supplied_flags() -> None: cmd, _ = dbt_base_operator.build_cmd( Context(execution_date=datetime(2023, 2, 15, 12, 30)), ) - assert "--full-refresh" in cmd + assert cmd[-2] == "run" + assert cmd[-1] == "--full-refresh" + + +def test_dbt_base_operator_add_user_supplied_global_flags() -> None: + dbt_base_operator = DbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + base_cmd=["run"], + dbt_cmd_global_flags=["--cache-selected-only"], + ) + + cmd, _ = dbt_base_operator.build_cmd( + Context(execution_date=datetime(2023, 2, 15, 12, 30)), + ) + assert cmd[-2] == "--cache-selected-only" + assert cmd[-1] == "run" @pytest.mark.parametrize( From 2aeca0e674d4434d29246d0fa771304f04302566 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 16 Aug 2023 19:36:03 +0100 Subject: [PATCH 2/3] Fix forks code revision in code coverage (#472) The code coverage report was being shown incorrectly for PRs derived from forks, as seen in the PR https://github.com/astronomer/astronomer-cosmos/pull/471 Incorrect parts of the code were highlighted: ![Screenshot 2023-08-16 at 11 25 27](https://github.com/astronomer/astronomer-cosmos/assets/272048/b657e7a2-4c35-4869-a99d-56d4ddc3c823) Reference: https://app.codecov.io/gh/astronomer/astronomer-cosmos/pull/471?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=astronomer#diff-Y29zbW9zL3Byb2ZpbGVzL2JpZ3F1ZXJ5L3NlcnZpY2VfYWNjb3VudF9rZXlmaWxlX2RpY3QucHk= --- .github/workflows/test.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b412df1c0..4aed8ac61 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -206,6 +206,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} - name: Set up Python 3.10 uses: actions/setup-python@v3 with: From c676429eb36e13fe2354e9cb53c48ff7fc541cd7 Mon Sep 17 00:00:00 2001 From: Jensen Yap Date: Thu, 17 Aug 2023 05:26:31 +0900 Subject: [PATCH 3/3] Fix bug on select node add exclude selector subset ids logic (#463) Add the following: 1. Add Skip case if all configs are None 2. Exclude subset id to not be overwritten as it iterates 3. Add more test cases Closes: #464 --- cosmos/dbt/selector.py | 41 +++++++++------- tests/dbt/test_selector.py | 96 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 19 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index d067d77b7..fc1048220 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -1,5 +1,6 @@ from __future__ import annotations from pathlib import Path +import copy from typing import TYPE_CHECKING @@ -43,6 +44,10 @@ def __init__(self, project_dir: Path, statement: str): self.other: list[str] = [] self.load_from_statement(statement) + @property + def is_empty(self) -> bool: + return not (self.paths or self.tags or self.config or self.other) + def load_from_statement(self, statement: str) -> None: """ Load in-place select parameters. @@ -84,27 +89,30 @@ def select_nodes_ids_by_intersection(nodes: dict[str, DbtNode], config: Selector https://docs.getdbt.com/reference/node-selection/yaml-selectors """ selected_nodes = set() - for node_id, node in nodes.items(): - if config.tags and not (sorted(node.tags) == sorted(config.tags)): - continue - supported_node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG} - if config.config: - config_tag = config.config.get("tags") - if config_tag and config_tag not in supported_node_config.get("tags", []): + if not config.is_empty: + for node_id, node in nodes.items(): + if config.tags and not (sorted(node.tags) == sorted(config.tags)): continue - # Remove 'tags' as they've already been filtered for - config.config.pop("tags", None) - supported_node_config.pop("tags", None) + supported_node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG} + config_tag = config.config.get("tags") + if config.config: + if config_tag and config_tag not in supported_node_config.get("tags", []): + continue + + # Remove 'tags' as they've already been filtered for + config_copy = copy.deepcopy(config.config) + config_copy.pop("tags", None) + supported_node_config.pop("tags", None) - if not (config.config.items() <= supported_node_config.items()): - continue + if not (config_copy.items() <= supported_node_config.items()): + continue - if config.paths and not (set(config.paths).issubset(set(node.file_path.parents))): - continue + if config.paths and not (set(config.paths).issubset(set(node.file_path.parents))): + continue - selected_nodes.add(node_id) + selected_nodes.add(node_id) return selected_nodes @@ -166,9 +174,10 @@ def select_nodes( nodes_ids = set(nodes.keys()) + exclude_ids: set[str] = set() for statement in exclude: config = SelectorConfig(project_dir, statement) - exclude_ids = select_nodes_ids_by_intersection(nodes, config) + exclude_ids = exclude_ids.union(set(select_nodes_ids_by_intersection(nodes, config))) subset_ids = set(nodes_ids) - set(exclude_ids) return {id_: nodes[id_] for id_ in subset_ids} diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index ac404b14a..7c6ff3292 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -2,6 +2,7 @@ import pytest +from cosmos.dbt.selector import SelectorConfig from cosmos.constants import DbtResourceType from cosmos.dbt.graph import DbtNode from cosmos.dbt.selector import select_nodes @@ -9,6 +10,34 @@ SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") + +@pytest.fixture +def selector_config(): + project_dir = Path("/path/to/project") + statement = "" + return SelectorConfig(project_dir, statement) + + +@pytest.mark.parametrize( + "paths, tags, config, other, expected", + [ + ([], [], {}, [], True), + ([Path("path1")], [], {}, [], False), + ([], ["tag:has_child"], {}, [], False), + ([], [], {"config.tags:test"}, [], False), + ([], [], {}, ["other"], False), + ([Path("path1")], ["tag:has_child"], {"config.tags:test"}, ["other"], False), + ], +) +def test_is_empty_config(selector_config, paths, tags, config, other, expected): + selector_config.paths = paths + selector_config.tags = tags + selector_config.config = config + selector_config.other = other + + assert selector_config.is_empty == expected + + grandparent_node = DbtNode( name="grandparent", unique_id="grandparent", @@ -37,10 +66,32 @@ config={"materialized": "table", "tags": ["is_child"]}, ) +grandchild_1_test_node = DbtNode( + name="grandchild_1", + unique_id="grandchild_1", + resource_type=DbtResourceType.MODEL, + depends_on=["parent"], + file_path=SAMPLE_PROJ_PATH / "gen3/models/grandchild_1.sql", + tags=["nightly"], + config={"materialized": "table", "tags": ["deprecated", "test"]}, +) + +grandchild_2_test_node = DbtNode( + name="grandchild_2", + unique_id="grandchild_2", + resource_type=DbtResourceType.MODEL, + depends_on=["parent"], + file_path=SAMPLE_PROJ_PATH / "gen3/models/grandchild_2.sql", + tags=["nightly"], + config={"materialized": "table", "tags": ["deprecated", "test2"]}, +) + sample_nodes = { grandparent_node.unique_id: grandparent_node, parent_node.unique_id: parent_node, child_node.unique_id: child_node, + grandchild_1_test_node.unique_id: grandchild_1_test_node, + grandchild_2_test_node.unique_id: grandchild_2_test_node, } @@ -52,13 +103,19 @@ def test_select_nodes_by_select_tag(): def test_select_nodes_by_select_config(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["config.materialized:table"]) - expected = {child_node.unique_id: child_node} + expected = { + child_node.unique_id: child_node, + grandchild_1_test_node.unique_id: grandchild_1_test_node, + grandchild_2_test_node.unique_id: grandchild_2_test_node, + } assert selected == expected def test_select_nodes_by_select_config_tag(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["config.tags:is_child"]) - expected = {child_node.unique_id: child_node} + expected = { + child_node.unique_id: child_node, + } assert selected == expected @@ -74,6 +131,21 @@ def test_select_nodes_by_select_union_config_tag(): assert selected == expected +def test_select_nodes_by_select_union_config_test_tags(): + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, + nodes=sample_nodes, + select=["config.tags:test", "config.tags:test2", "config.materialized:view"], + ) + expected = { + grandparent_node.unique_id: grandparent_node, + parent_node.unique_id: parent_node, + grandchild_1_test_node.unique_id: grandchild_1_test_node, + grandchild_2_test_node.unique_id: grandchild_2_test_node, + } + assert selected == expected + + def test_select_nodes_by_select_intersection_config_tag(): selected = select_nodes( project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["config.tags:is_child,config.materialized:view"] @@ -95,6 +167,8 @@ def test_select_nodes_by_select_union(): grandparent_node.unique_id: grandparent_node, parent_node.unique_id: parent_node, child_node.unique_id: child_node, + grandchild_1_test_node.unique_id: grandchild_1_test_node, + grandchild_2_test_node.unique_id: grandchild_2_test_node, } assert selected == expected @@ -106,7 +180,11 @@ def test_select_nodes_by_select_intersection(): def test_select_nodes_by_exclude_tag(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["tag:has_child"]) - expected = {child_node.unique_id: child_node} + expected = { + child_node.unique_id: child_node, + grandchild_1_test_node.unique_id: grandchild_1_test_node, + grandchild_2_test_node.unique_id: grandchild_2_test_node, + } assert selected == expected @@ -122,3 +200,15 @@ def test_select_nodes_by_select_union_exclude_tags(): ) expected = {} assert selected == expected + + +def test_select_nodes_by_exclude_union_config_test_tags(): + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["config.tags:test", "config.tags:test2"] + ) + expected = { + grandparent_node.unique_id: grandparent_node, + parent_node.unique_id: parent_node, + child_node.unique_id: child_node, + } + assert selected == expected