Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support customizing how dbt nodes are converted to Airflow #503

Merged
merged 29 commits into from
Oct 13, 2023

Conversation

tatiana
Copy link
Collaborator

@tatiana tatiana commented Sep 1, 2023

Description

This PR aims to solve the following current limitations of Cosmos 1.0:

  • If you want to pass arguments to just some tasks (e.g., on_warning_callback to test nodes), Cosmos has to add it to the main interface and explicitly pass it down to just those tasks
  • If a user wants to subclass one of the Cosmos operators and use that instead, they can't
  • If a user wants more granular customization over how each task is rendered, they can't use Cosmos

It does this by introducing the parameter node_converters to RenderConfig, which allows users to define a custom function to convert a DbtNode into nothing or an Airflow resource (Operator or TaskGroup instances).

Example of the feature

import os
from datetime import datetime
from pathlib import Path

from airflow.operators.dummy import DummyOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.constants import DbtResourceType
from cosmos.dbt.graph import DbtNode

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

os.environ["DBT_SQLITE_PATH"] = str(DEFAULT_DBT_ROOT_PATH / "simple")


profile_config = ProfileConfig(
    profile_name="simple",
    target_name="dev",
    profiles_yml_filepath=(DBT_ROOT_PATH / "simple/profiles.yml"),
)


# [START custom_dbt_nodes]
def convert_source(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
    return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source")


def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
    return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure")


render_config = RenderConfig(
    node_converters={DbtResourceType.SOURCE: convert_source, DbtResourceType("exposure"): convert_exposure}
)


example_cosmos_sources = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(
        DBT_ROOT_PATH / "simple",
    ),
    profile_config=profile_config,
    render_config=render_config,
    operator_args={"append_env": True},
    # normal dag parameters
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="example_cosmos_sources",
)
# [END custom_dbt_nodes]

Is now rendered in Airflow:
Screenshot 2023-10-10 at 12 04 31

Before this change, there was no way for users to describe how to render source or other unsupported dbt resources.

Related Issue(s)

Closes: #427
Closes: #477

Breaking Change?

No

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

Feedback addressed:

  • rename dbt_resource_converter to node_converters
  • support passing items not previously supported in the enum DbtResourceType
  • write docs and say this feature is experimental

@netlify
Copy link

netlify bot commented Sep 1, 2023

👷 Deploy Preview for amazing-pothos-a3bca0 processing.

Name Link
🔨 Latest commit 945f62f
🔍 Latest deploy log https://app.netlify.com/sites/amazing-pothos-a3bca0/deploys/652914cc1ddb330008a23cf9

@tatiana tatiana temporarily deployed to internal September 1, 2023 14:33 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 1, 2023 15:51 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 1, 2023 16:04 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 1, 2023 16:31 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 4, 2023 09:18 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 5, 2023 10:42 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 5, 2023 11:23 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 5, 2023 11:38 — with GitHub Actions Inactive
@tatiana tatiana added this to the 1.2.0 milestone Sep 6, 2023
@tatiana tatiana temporarily deployed to internal September 11, 2023 09:41 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal September 11, 2023 10:26 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 9, 2023 14:14 — with GitHub Actions Inactive
@tatiana tatiana changed the title WIP: Support customization of how Dbt resources are converted to Airflow Support customizing how dbt nodes are converted to Airflow Oct 9, 2023
@tatiana tatiana temporarily deployed to internal October 9, 2023 14:20 — with GitHub Actions Inactive
@codecov
Copy link

codecov bot commented Oct 9, 2023

Codecov Report

Attention: 1 lines in your changes are missing coverage. Please review.

Comparison is base (9460312) 93.01% compared to head (945f62f) 93.04%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #503      +/-   ##
==========================================
+ Coverage   93.01%   93.04%   +0.02%     
==========================================
  Files          51       51              
  Lines        2019     2041      +22     
==========================================
+ Hits         1878     1899      +21     
- Misses        141      142       +1     
Files Coverage Δ
cosmos/config.py 92.94% <100.00%> (+0.08%) ⬆️
cosmos/constants.py 100.00% <100.00%> (ø)
cosmos/converter.py 97.10% <100.00%> (+0.04%) ⬆️
cosmos/dbt/graph.py 98.79% <100.00%> (+<0.01%) ⬆️
cosmos/airflow/graph.py 98.75% <96.15%> (-1.25%) ⬇️

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana temporarily deployed to internal October 9, 2023 15:21 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 9, 2023 15:28 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 9, 2023 15:32 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 9, 2023 15:37 — with GitHub Actions Inactive
@tatiana tatiana marked this pull request as ready for review October 9, 2023 15:42
@tatiana tatiana requested a review from a team as a code owner October 9, 2023 15:42
@tatiana tatiana requested a review from a team October 9, 2023 15:42
@tatiana tatiana temporarily deployed to internal October 9, 2023 15:42 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 10, 2023 09:55 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 10, 2023 10:32 — with GitHub Actions Inactive
@tatiana tatiana temporarily deployed to internal October 10, 2023 13:01 — with GitHub Actions Inactive
cosmos/airflow/graph.py Outdated Show resolved Hide resolved
cosmos/airflow/graph.py Outdated Show resolved Hide resolved
pyproject.toml Outdated Show resolved Hide resolved
@tatiana tatiana temporarily deployed to internal October 13, 2023 09:51 — with GitHub Actions Inactive
cosmos/airflow/graph.py:224: error: Call to untyped function (unknown) in typed context  [no-untyped-call]

https://github.com/astronomer/astronomer-cosmos/actions/runs/6506710243/job/17672777673\?pr\=503
@tatiana tatiana temporarily deployed to internal October 13, 2023 09:59 — with GitHub Actions Inactive
@tatiana tatiana merged commit e80abd7 into main Oct 13, 2023
41 checks passed
@tatiana tatiana deleted the issue-427-task-generator branch October 13, 2023 10:56
@tatiana tatiana mentioned this pull request Oct 13, 2023
tatiana added a commit that referenced this pull request Oct 13, 2023
**Features**

* Add support to model versioning available since dbt 1.6 by @binhnq94
in #516
* Add AWS Athena profile mapping by @benjamin-awd in #578
* Support customizing how dbt nodes are converted to Airflow by @tatiana
in #503
* Make the arg ``dbt_project_path`` in the ``ProjectConfig`` optional by
@MrBones757 in #581

**Bug fixes**

* Fix Cosmos custom selector to support filtering a single model by
@jlaneve and @harels in #576
* Fix using ``GoogleCloudServiceAccountDictProfileMapping`` together
with ``LoadMethod.DBT_LS`` by @joppevos in #587
* Fix using the ``full_refresh`` argument in projects that contain tests
by @EgorSemenov and @tatiana in #590
* Stop creating symbolic links for ``dbt_packages`` (solves
``LocalExecutor`` concurrency issue) by @tatiana in #600

**Others**

* Docs: add reference to original Jaffle Shop project by @erdos2n in
#583
* Docs: retries & note about DagBag error by @TJaniF in #592
* pre-commit updates in #575 and #585
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Don't crash parsing when there's an unrecognized node type Add task generator methods
2 participants