Skip to content

Commit

Permalink
AIP-47 - Migrate dbt DAGs to new design apache#22472
Browse files Browse the repository at this point in the history
  • Loading branch information
chethanuk committed Jun 4, 2022
1 parent acf8951 commit 8f2762d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-dbt-cloud/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/dbt/cloud/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/dbt>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-dbt-cloud/>
Installing from sources <installing-providers-from-sources>

Expand Down
8 changes: 4 additions & 4 deletions docs/apache-airflow-providers-dbt-cloud/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks w
asynchronous waiting for run termination, respectively. To note, the ``account_id`` for the operators is
referenced within the ``default_args`` of the example DAG.

.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py
.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dbt_cloud_run_job]
Expand All @@ -60,7 +60,7 @@ referenced within the ``default_args`` of the example DAG.
This next example also shows how to pass in custom runtime configuration (in this case for ``threads_override``)
via the ``additional_run_config`` dictionary.

.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py
.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dbt_cloud_run_job_async]
Expand All @@ -80,7 +80,7 @@ In the example below, the ``run_id`` value in the example below comes from the o
DbtCloudRunJobOperator task by utilizing the ``.output`` property exposed for all operators. Also, to note,
the ``account_id`` for the task is referenced within the ``default_args`` of the example DAG.

.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py
.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dbt_cloud_run_job_sensor]
Expand All @@ -101,7 +101,7 @@ downloaded.
For more information on dbt Cloud artifacts, reference
`this documentation <https://docs.getdbt.com/docs/dbt-cloud/using-dbt-cloud/artifacts>`__.

.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py
.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dbt_cloud_get_artifact]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@

from datetime import datetime

from airflow.models import DAG, BaseOperator
from airflow.models import DAG

try:
from airflow.operators.empty import EmptyOperator
except ModuleNotFoundError:
from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore

from airflow.providers.dbt.cloud.operators.dbt import (
DbtCloudGetJobRunArtifactOperator,
DbtCloudRunJobOperator,
)
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from airflow.utils.edgemodifier import Label
from tests.system.utils import get_test_env_id

ENV_ID = get_test_env_id()
DAG_ID = "example_dbt_cloud"

with DAG(
dag_id="example_dbt_cloud",
dag_id=DAG_ID,
default_args={"dbt_cloud_conn_id": "dbt", "account_id": 39151},
start_date=datetime(2021, 1, 1),
schedule_interval=None,
Expand All @@ -50,7 +55,7 @@
# [END howto_operator_dbt_cloud_run_job]

# [START howto_operator_dbt_cloud_get_artifact]
get_run_results_artifact: BaseOperator = DbtCloudGetJobRunArtifactOperator(
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)
# [END howto_operator_dbt_cloud_get_artifact]
Expand All @@ -65,7 +70,7 @@
# [END howto_operator_dbt_cloud_run_job_async]

# [START howto_operator_dbt_cloud_run_job_sensor]
job_run_sensor: BaseOperator = DbtCloudJobRunSensor(
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
# [END howto_operator_dbt_cloud_run_job_sensor]
Expand All @@ -77,3 +82,15 @@
# Task dependency created via `XComArgs`:
# trigger_job_run1 >> get_run_results_artifact
# trigger_job_run2 >> job_run_sensor

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
6 changes: 6 additions & 0 deletions tests/system/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os

from airflow.utils.state import State


Expand All @@ -23,3 +25,7 @@ def test_run():
dag.run()

return test_run


def get_test_env_id(env_var_name: str = "SYSTEM_TESTS_ENV_ID"):
return os.environ.get(env_var_name)

0 comments on commit 8f2762d

Please sign in to comment.