Skip to content

Commit

Permalink
Replace deprecated DummyOperator by EmptyOperator if Airflow >=2.4.0 (a…
Browse files Browse the repository at this point in the history
…stronomer#900)

Change the example of customising Cosmos operators to use `EmptyOperator`,
since the `DummyOperator` was deprecated in Airflow 2.4.
  • Loading branch information
tatiana authored and arojasb3 committed Jul 14, 2024
1 parent 8d9eb62 commit 07b8586
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions dev/dags/example_cosmos_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from pathlib import Path

from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator

try: # available since Airflow 2.4.0
from airflow.operators.empty import EmptyOperator
except ImportError:
from airflow.operators.dummy import DummyOperator as EmptyOperator
from airflow.utils.task_group import TaskGroup

from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
Expand All @@ -38,21 +42,21 @@


# [START custom_dbt_nodes]
# Cosmos will use this function to generate a DummyOperator task when it finds a source node, in the manifest.
# Cosmos will use this function to generate an empty task when it finds a source node, in the manifest.
# A more realistic use case could be to use an Airflow sensor to represent a source.
def convert_source(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
"""
Return an instance of DummyOperator to represent a dbt "source" node.
Return an instance of a desired operator to represent a dbt "source" node.
"""
return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source")
return EmptyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source")


# Cosmos will use this function to generate a DummyOperator task when it finds a exposure node, in the manifest.
# Cosmos will use this function to generate an empty task when it finds a exposure node, in the manifest.
def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
"""
Return an instance of DummyOperator to represent a dbt "exposure" node.
Return an instance of a desired operator to represent a dbt "exposure" node.
"""
return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure")
return EmptyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure")


# Use `RenderConfig` to tell Cosmos, given a node type, how to convert a dbt node into an Airflow task or task group.
Expand Down

0 comments on commit 07b8586

Please sign in to comment.