diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index abb3ffe463a36c..24c14e663a2351 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -295,7 +295,7 @@ repos: exclude: ^airflow/_vendor/|^RELEASE_NOTES\.txt$|^airflow/www/static/css/material-icons\.css$|^images/.*$ args: - --ignore-words=docs/spelling_wordlist.txt - - --skip=docs/*/commits.rst,airflow/providers/*/*.rst,*.lock,INTHEWILD.md,*.min.js,docs/apache-airflow/pipeline_example.csv,airflow/www/*.log + - --skip=docs/*/commits.rst,airflow/providers/*/*.rst,*.lock,INTHEWILD.md,*.min.js,docs/apache-airflow/tutorial/pipeline_example.csv,airflow/www/*.log - --exclude-file=.codespellignorelines - repo: local hooks: diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_dag.py similarity index 95% rename from airflow/example_dags/tutorial_etl_dag.py rename to airflow/example_dags/tutorial_dag.py index 6b0271ee277232..03157de82f875f 100644 --- a/airflow/example_dags/tutorial_etl_dag.py +++ b/airflow/example_dags/tutorial_dag.py @@ -18,8 +18,8 @@ """ -### ETL DAG Tutorial Documentation -This ETL DAG is demonstrating an Extract -> Transform -> Load pipeline +### DAG Tutorial Documentation +This DAG is demonstrating an Extract -> Transform -> Load pipeline """ # [START tutorial] # [START import_module] @@ -38,13 +38,13 @@ # [START instantiate_dag] with DAG( - 'tutorial_etl_dag', + 'tutorial_dag', # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={'retries': 2}, # [END default_args] - description='ETL DAG tutorial', + description='DAG tutorial', schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api.py similarity index 94% rename from airflow/example_dags/tutorial_taskflow_api_etl.py rename to airflow/example_dags/tutorial_taskflow_api.py index d6068dd1d60521..4ff2f68831348f 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl.py +++ b/airflow/example_dags/tutorial_taskflow_api.py @@ -35,10 +35,10 @@ catchup=False, tags=['example'], ) -def tutorial_taskflow_api_etl(): +def tutorial_taskflow_api(): """ ### TaskFlow API Tutorial Documentation - This is a simple ETL data pipeline example which demonstrates the use of + This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located @@ -100,7 +100,7 @@ def load(total_order_value: float): # [START dag_invocation] -tutorial_etl_dag = tutorial_taskflow_api_etl() +tutorial_dag = tutorial_taskflow_api() # [END dag_invocation] # [END tutorial] diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py similarity index 89% rename from airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py rename to airflow/example_dags/tutorial_taskflow_api_virtualenv.py index e8e4610e19c36f..732d533eb63842 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py @@ -26,16 +26,14 @@ log = logging.getLogger(__name__) if not shutil.which("virtualenv"): - log.warning( - "The tutorial_taskflow_api_etl_virtualenv example DAG requires virtualenv, please install it." - ) + log.warning("The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.") else: @dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) - def tutorial_taskflow_api_etl_virtualenv(): + def tutorial_taskflow_api_virtualenv(): """ ### TaskFlow API example using virtualenv - This is a simple ETL data pipeline example which demonstrates the use of + This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. """ @@ -86,4 +84,4 @@ def load(total_order_value: float): order_summary = transform(order_data) load(order_summary["total_order_value"]) - tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv() + tutorial_dag = tutorial_taskflow_api_virtualenv() diff --git a/docs/apache-airflow/concepts/taskflow.rst b/docs/apache-airflow/concepts/taskflow.rst index 1c2d133fcff633..96b93a86a2d75b 100644 --- a/docs/apache-airflow/concepts/taskflow.rst +++ b/docs/apache-airflow/concepts/taskflow.rst @@ -61,7 +61,7 @@ You can also use a plain value or variable to call a TaskFlow function - for exa hello_name('Airflow users') -If you want to learn more about using TaskFlow, you should consult :doc:`the TaskFlow tutorial `. +If you want to learn more about using TaskFlow, you should consult :doc:`the TaskFlow tutorial `. Context ------- diff --git a/docs/apache-airflow/howto/docker-compose/index.rst b/docs/apache-airflow/howto/docker-compose/index.rst index 66fe9aad1ad2a1..f8b5dc293525e9 100644 --- a/docs/apache-airflow/howto/docker-compose/index.rst +++ b/docs/apache-airflow/howto/docker-compose/index.rst @@ -283,7 +283,7 @@ The Docker Compose file uses the latest Airflow image (`apache/airflow `_ for this example. -The steps below should be sufficient, but see the quick-start documentation for full instructions. - -.. code-block:: bash - - # Download the docker-compose.yaml file - curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml' - - # Make expected directories and set an expected environment variable - mkdir -p ./dags ./logs ./plugins - echo -e "AIRFLOW_UID=$(id -u)" > .env - - # Initialize the database - docker-compose up airflow-init - - # Start up all services - docker-compose up - -After all services have started up, the web UI will be available at: ``http://localhost:8080``. The default account has the username ``airflow`` and the password ``airflow``. - -We will also need to create a `connection `_ to the postgres db. To create one via the web UI, from the "Admin" menu, select "Connections", then click the Plus sign to "Add a new record" to the list of connections. - -Fill in the fields as shown below. Note the Connection Id value, which we'll pass as a parameter for the ``postgres_conn_id`` kwarg. - -- Connection Id: tutorial_pg_conn -- Connection Type: postgres -- Host: postgres -- Schema: airflow -- Login: airflow -- Password: airflow -- Port: 5432 - -Test your connection and if the test is successful, save your connection. - -Table Creation Tasks -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -We can use the `PostgresOperator `_ to define tasks that create tables in our postgres db. - -We'll create one table to facilitate data cleaning steps (``employees_temp``) and another table to store our cleaned data (``employees``). - -.. code-block:: python - - from airflow.providers.postgres.operators.postgres import PostgresOperator - - create_employees_table = PostgresOperator( - task_id="create_employees_table", - postgres_conn_id="tutorial_pg_conn", - sql=""" - CREATE TABLE IF NOT EXISTS employees ( - "Serial Number" NUMERIC PRIMARY KEY, - "Company Name" TEXT, - "Employee Markme" TEXT, - "Description" TEXT, - "Leave" INTEGER - );""", - ) - - create_employees_temp_table = PostgresOperator( - task_id="create_employees_temp_table", - postgres_conn_id="tutorial_pg_conn", - sql=""" - DROP TABLE IF EXISTS employees_temp; - CREATE TABLE employees_temp ( - "Serial Number" NUMERIC PRIMARY KEY, - "Company Name" TEXT, - "Employee Markme" TEXT, - "Description" TEXT, - "Leave" INTEGER - );""", - ) - -Optional Note: -"""""""""""""" -If you want to abstract these sql statements out of your DAG, you can move the statements sql files somewhere within the ``dags/`` directory and pass the sql file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in ``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to: - -.. code-block:: python - - create_employees_table = PostgresOperator( - task_id="create_employees_table", - postgres_conn_id="tutorial_pg_conn", - sql="sql/employees_schema.sql", - ) - -and repeat for the ``employees_temp`` table. - -Data Retrieval Task -~~~~~~~~~~~~~~~~~~~ - -Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps. - -.. code-block:: python - - import os - import requests - from airflow.decorators import task - from airflow.providers.postgres.hooks.postgres import PostgresHook - - - @task - def get_data(): - # NOTE: configure this as appropriate for your airflow environment - data_path = "/opt/airflow/dags/files/employees.csv" - os.makedirs(os.path.dirname(data_path), exist_ok=True) - - url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv" - - response = requests.request("GET", url) - - with open(data_path, "w") as file: - file.write(response.text) - - postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") - conn = postgres_hook.get_conn() - cur = conn.cursor() - with open(data_path, "r") as file: - cur.copy_expert( - "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", - file, - ) - conn.commit() - -Data Merge Task -~~~~~~~~~~~~~~~ - -Here we select completely unique records from the retrieved data, then we check to see if any employee ``Serial Numbers`` are already in the database (if they are, we update those records with the new data). - -.. code-block:: python - - from airflow.decorators import task - from airflow.providers.postgres.hooks.postgres import PostgresHook - - - @task - def merge_data(): - query = """ - INSERT INTO employees - SELECT * - FROM ( - SELECT DISTINCT * - FROM employees_temp - ) - ON CONFLICT ("Serial Number") DO UPDATE - SET "Serial Number" = excluded."Serial Number"; - """ - try: - postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") - conn = postgres_hook.get_conn() - cur = conn.cursor() - cur.execute(query) - conn.commit() - return 0 - except Exception as e: - return 1 - - - -Completing our DAG: -~~~~~~~~~~~~~~~~~~~ -We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to: - -* run every day at midnight starting on Jan 1, 2021, -* only run once in the event that days are missed, and -* timeout after 60 minutes - -And from the last line in the definition of the ``Etl`` DAG, we see: - -.. code-block:: python - - [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() - -* the ``merge_data()`` task depends on the ``get_data()`` task, -* the ``get_data()`` depends on both the ``create_employees_table`` and ``create_employees_temp_table`` tasks, and -* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can run independently. - -Putting all of the pieces together, we have our completed DAG. - -.. code-block:: python - - import datetime - import pendulum - import os - - import requests - from airflow.decorators import dag, task - from airflow.providers.postgres.hooks.postgres import PostgresHook - from airflow.providers.postgres.operators.postgres import PostgresOperator - - - @dag( - schedule="0 0 * * *", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - dagrun_timeout=datetime.timedelta(minutes=60), - ) - def Etl(): - create_employees_table = PostgresOperator( - task_id="create_employees_table", - postgres_conn_id="tutorial_pg_conn", - sql=""" - CREATE TABLE IF NOT EXISTS employees ( - "Serial Number" NUMERIC PRIMARY KEY, - "Company Name" TEXT, - "Employee Markme" TEXT, - "Description" TEXT, - "Leave" INTEGER - );""", - ) - - create_employees_temp_table = PostgresOperator( - task_id="create_employees_temp_table", - postgres_conn_id="tutorial_pg_conn", - sql=""" - DROP TABLE IF EXISTS employees_temp; - CREATE TABLE employees_temp ( - "Serial Number" NUMERIC PRIMARY KEY, - "Company Name" TEXT, - "Employee Markme" TEXT, - "Description" TEXT, - "Leave" INTEGER - );""", - ) - - @task - def get_data(): - # NOTE: configure this as appropriate for your airflow environment - data_path = "/opt/airflow/dags/files/employees.csv" - os.makedirs(os.path.dirname(data_path), exist_ok=True) - - url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv" - - response = requests.request("GET", url) - - with open(data_path, "w") as file: - file.write(response.text) - - postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") - conn = postgres_hook.get_conn() - cur = conn.cursor() - with open(data_path, "r") as file: - cur.copy_expert( - "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", - file, - ) - conn.commit() - - @task - def merge_data(): - query = """ - INSERT INTO employees - SELECT * - FROM ( - SELECT DISTINCT * - FROM employees_temp - ) - ON CONFLICT ("Serial Number") DO UPDATE - SET "Serial Number" = excluded."Serial Number"; - """ - try: - postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") - conn = postgres_hook.get_conn() - cur = conn.cursor() - cur.execute(query) - conn.commit() - return 0 - except Exception as e: - return 1 - - [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() - - - dag = Etl() - -Save this code to a python file in the ``/dags`` folder (e.g. ``dags/etl.py``) and (after a `brief delay `_), the ``Etl`` DAG will be included in the list of available DAGs on the web UI. - -.. image:: img/new_tutorial-1.png - -You can trigger the ``Etl`` DAG by unpausing it (via the slider on the left end) and running it (via the Run button under **Actions**). - -.. image:: img/new_tutorial-3.png - -In the ``Etl`` DAG's **Tree** view, we see all that all tasks ran successfully in all executed runs. Success! - - What's Next? ------------- -That's it, you have written, tested and backfilled your very first Airflow -pipeline. Merging your code into a code repository that has a master scheduler -running against it should get it to get triggered and run every day. +That's it! You have written, tested and backfilled your very first Airflow +pipeline. Merging your code into a repository that has a master scheduler +running against it should result in being triggered and run every day. -Here's a few things you might want to do next: +Here are a few things you might want to do next: .. seealso:: - - Read the :doc:`/concepts/index` section for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more. - - Take an in-depth tour of the UI - click all the things! - - Keep reading the docs! - - - Review the :doc:`how-to guides`, which include a guide to writing your own operator - - Review the :ref:`Command Line Interface Reference` - - Review the :ref:`List of operators ` - - Review the :ref:`Macros reference` - - Write your first pipeline! + - Continue to the next step of the tutorial: :doc:`/tutorial/taskflow` + - Skip to the the :doc:`/concepts/index` section for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more diff --git a/docs/apache-airflow/tutorial/index.rst b/docs/apache-airflow/tutorial/index.rst new file mode 100644 index 00000000000000..5a6e0b2de0731e --- /dev/null +++ b/docs/apache-airflow/tutorial/index.rst @@ -0,0 +1,28 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Tutorials +========= + +Once you have Airflow up and running with the :doc:`/start`, these tutorials are a great way to get a sense for how Airflow works. + +.. toctree:: + :maxdepth: 1 + + fundamentals + taskflow + pipeline diff --git a/docs/apache-airflow/tutorial/pipeline.rst b/docs/apache-airflow/tutorial/pipeline.rst new file mode 100644 index 00000000000000..26bbfb83e37834 --- /dev/null +++ b/docs/apache-airflow/tutorial/pipeline.rst @@ -0,0 +1,326 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + + +Building a Running Pipeline +=========================== + +Lets look at another example: we need to get some data from a file which is hosted online and insert it into our local database. We also need to look at removing duplicate rows while inserting. + +Initial setup +------------- + +We need to have Docker installed as we will be using the :doc:`/howto/docker-compose/index` procedure for this example. +The steps below should be sufficient, but see the quick-start documentation for full instructions. + +.. code-block:: bash + + # Download the docker-compose.yaml file + curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml' + + # Make expected directories and set an expected environment variable + mkdir -p ./dags ./logs ./plugins + echo -e "AIRFLOW_UID=$(id -u)" > .env + + # Initialize the database + docker-compose up airflow-init + + # Start up all services + docker-compose up + +After all services have started up, the web UI will be available at: ``http://localhost:8080``. The default account has the username ``airflow`` and the password ``airflow``. + +We will also need to create a `connection `_ to the postgres db. To create one via the web UI, from the "Admin" menu, select "Connections", then click the Plus sign to "Add a new record" to the list of connections. + +Fill in the fields as shown below. Note the Connection Id value, which we'll pass as a parameter for the ``postgres_conn_id`` kwarg. + +- Connection Id: tutorial_pg_conn +- Connection Type: postgres +- Host: postgres +- Schema: airflow +- Login: airflow +- Password: airflow +- Port: 5432 + +Test your connection and if the test is successful, save your connection. + +Table Creation Tasks +-------------------- + +We can use the `PostgresOperator `_ to define tasks that create tables in our postgres db. + +We'll create one table to facilitate data cleaning steps (``employees_temp``) and another table to store our cleaned data (``employees``). + +.. code-block:: python + + from airflow.providers.postgres.operators.postgres import PostgresOperator + + create_employees_table = PostgresOperator( + task_id="create_employees_table", + postgres_conn_id="tutorial_pg_conn", + sql=""" + CREATE TABLE IF NOT EXISTS employees ( + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER + );""", + ) + + create_employees_temp_table = PostgresOperator( + task_id="create_employees_temp_table", + postgres_conn_id="tutorial_pg_conn", + sql=""" + DROP TABLE IF EXISTS employees_temp; + CREATE TABLE employees_temp ( + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER + );""", + ) + +Optional: Using SQL From Files +------------------------------ + +If you want to abstract these sql statements out of your DAG, you can move the statements sql files somewhere within the ``dags/`` directory and pass the sql file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in ``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to: + +.. code-block:: python + + create_employees_table = PostgresOperator( + task_id="create_employees_table", + postgres_conn_id="tutorial_pg_conn", + sql="sql/employees_schema.sql", + ) + +and repeat for the ``employees_temp`` table. + +Data Retrieval Task +------------------- + +Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps. + +.. code-block:: python + + import os + import requests + from airflow.decorators import task + from airflow.providers.postgres.hooks.postgres import PostgresHook + + + @task + def get_data(): + # NOTE: configure this as appropriate for your airflow environment + data_path = "/opt/airflow/dags/files/employees.csv" + os.makedirs(os.path.dirname(data_path), exist_ok=True) + + url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv" + + response = requests.request("GET", url) + + with open(data_path, "w") as file: + file.write(response.text) + + postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") + conn = postgres_hook.get_conn() + cur = conn.cursor() + with open(data_path, "r") as file: + cur.copy_expert( + "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", + file, + ) + conn.commit() + +Data Merge Task +--------------- + +Here we select completely unique records from the retrieved data, then we check to see if any employee ``Serial Numbers`` are already in the database (if they are, we update those records with the new data). + +.. code-block:: python + + from airflow.decorators import task + from airflow.providers.postgres.hooks.postgres import PostgresHook + + + @task + def merge_data(): + query = """ + INSERT INTO employees + SELECT * + FROM ( + SELECT DISTINCT * + FROM employees_temp + ) + ON CONFLICT ("Serial Number") DO UPDATE + SET "Serial Number" = excluded."Serial Number"; + """ + try: + postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") + conn = postgres_hook.get_conn() + cur = conn.cursor() + cur.execute(query) + conn.commit() + return 0 + except Exception as e: + return 1 + + + +Completing our DAG +------------------ + +We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to: + +* run every day at midnight starting on Jan 1, 2021, +* only run once in the event that days are missed, and +* timeout after 60 minutes + +And from the last line in the definition of the ``process-employees`` DAG, we see: + +.. code-block:: python + + [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() + +* the ``merge_data()`` task depends on the ``get_data()`` task, +* the ``get_data()`` depends on both the ``create_employees_table`` and ``create_employees_temp_table`` tasks, and +* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can run independently. + +Putting all of the pieces together, we have our completed DAG. + +.. code-block:: python + + import datetime + import pendulum + import os + + import requests + from airflow.decorators import dag, task + from airflow.providers.postgres.hooks.postgres import PostgresHook + from airflow.providers.postgres.operators.postgres import PostgresOperator + + + @dag( + dag_id="process-employees", + schedule_interval="0 0 * * *", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + dagrun_timeout=datetime.timedelta(minutes=60), + ) + def ProcessEmployees(): + create_employees_table = PostgresOperator( + task_id="create_employees_table", + postgres_conn_id="tutorial_pg_conn", + sql=""" + CREATE TABLE IF NOT EXISTS employees ( + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER + );""", + ) + + create_employees_temp_table = PostgresOperator( + task_id="create_employees_temp_table", + postgres_conn_id="tutorial_pg_conn", + sql=""" + DROP TABLE IF EXISTS employees_temp; + CREATE TABLE employees_temp ( + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER + );""", + ) + + @task + def get_data(): + # NOTE: configure this as appropriate for your airflow environment + data_path = "/opt/airflow/dags/files/employees.csv" + os.makedirs(os.path.dirname(data_path), exist_ok=True) + + url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv" + + response = requests.request("GET", url) + + with open(data_path, "w") as file: + file.write(response.text) + + postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") + conn = postgres_hook.get_conn() + cur = conn.cursor() + with open(data_path, "r") as file: + cur.copy_expert( + "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", + file, + ) + conn.commit() + + @task + def merge_data(): + query = """ + INSERT INTO employees + SELECT * + FROM ( + SELECT DISTINCT * + FROM employees_temp + ) + ON CONFLICT ("Serial Number") DO UPDATE + SET "Serial Number" = excluded."Serial Number"; + """ + try: + postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") + conn = postgres_hook.get_conn() + cur = conn.cursor() + cur.execute(query) + conn.commit() + return 0 + except Exception as e: + return 1 + + [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() + + + dag = ProcessEmployees() + +Save this code to a python file in the ``/dags`` folder (e.g. ``dags/process-employees.py``) and (after a `brief delay `_), the ``process-employees`` DAG will be included in the list of available DAGs on the web UI. + +.. image:: ../img/tutorial-pipeline-1.png + +You can trigger the ``process-employees`` DAG by unpausing it (via the slider on the left end) and running it (via the Run button under **Actions**). + +.. image:: ../img/tutorial-pipeline-2.png + +In the ``process-employees`` DAG's **Grid** view, we see all that all tasks ran successfully in all executed runs. Success! + +What's Next? +------------- +You now have a pipeline running inside Airflow using Docker Compose. Here are a few things you might want to do next: + +.. seealso:: + - Take an in-depth tour of the UI - click all the things! see what they do! + - Keep reading the docs + - Review the :doc:`how-to guides`, which include a guide for writing your own operator + - Review the :ref:`Command Line Interface Reference` + - Review the :ref:`List of operators ` + - Review the :ref:`Macros reference` + - Write your first pipeline diff --git a/docs/apache-airflow/pipeline_example.csv b/docs/apache-airflow/tutorial/pipeline_example.csv similarity index 100% rename from docs/apache-airflow/pipeline_example.csv rename to docs/apache-airflow/tutorial/pipeline_example.csv diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial/taskflow.rst similarity index 93% rename from docs/apache-airflow/tutorial_taskflow_api.rst rename to docs/apache-airflow/tutorial/taskflow.rst index 00b3e43a2aa524..35609325e1c20a 100644 --- a/docs/apache-airflow/tutorial_taskflow_api.rst +++ b/docs/apache-airflow/tutorial/taskflow.rst @@ -18,23 +18,23 @@ -Tutorial on the TaskFlow API -============================ +Working with TaskFlow +===================== This tutorial builds on the regular Airflow Tutorial and focuses specifically on writing data pipelines using the TaskFlow API paradigm which is introduced as part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm. -The data pipeline chosen here is a simple ETL pattern with -three separate tasks for Extract, Transform, and Load. +The data pipeline chosen here is a simple pattern with +three separate Extract, Transform, and Load tasks. -Example "TaskFlow API" ETL Pipeline ------------------------------------ +Example "TaskFlow API" Pipeline +------------------------------- -Here is a very simple ETL pipeline using the TaskFlow API paradigm. A more detailed +Here is a very simple pipeline using the TaskFlow API paradigm. A more detailed explanation is given below. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :start-after: [START tutorial] :end-before: [END tutorial] @@ -44,7 +44,7 @@ It's a DAG definition file If this is the first DAG file you are looking at, please note that this Python script is interpreted by Airflow and is a configuration file for your data pipeline. -For a complete introduction to DAG files, please look at the core :doc:`Airflow tutorial` +For a complete introduction to DAG files, please look at the core :doc:`fundamentals tutorial` which covers DAG structure and definitions extensively. @@ -57,7 +57,7 @@ when we set this up with Airflow, without any retries or complex scheduling. In this example, please notice that we are creating this DAG using the ``@dag`` decorator as shown below, with the Python function name acting as the DAG identifier. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :start-after: [START instantiate_dag] :end-before: [END instantiate_dag] @@ -67,7 +67,7 @@ Tasks In this data pipeline, tasks are created based on Python functions using the ``@task`` decorator as shown below. The function name acts as a unique identifier for the task. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :dedent: 4 :start-after: [START extract] @@ -82,7 +82,7 @@ Main flow of the DAG Now that we have the Extract, Transform, and Load tasks defined based on the Python functions, we can move to the main part of the DAG. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :dedent: 4 :start-after: [START main_flow] @@ -95,9 +95,9 @@ The dependencies between the tasks and the passing of data between these tasks w running on different workers on different nodes on the network is all handled by Airflow. Now to actually enable this to be run as a DAG, we invoke the Python function -``tutorial_taskflow_api_etl`` set up using the ``@dag`` decorator earlier, as shown below. +``tutorial_taskflow_api`` set up using the ``@dag`` decorator earlier, as shown below. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :start-after: [START dag_invocation] :end-before: [END dag_invocation] @@ -108,7 +108,7 @@ But how? For experienced Airflow DAG authors, this is startlingly simple! Let's contrast this with how this DAG had to be written before Airflow 2.0 below: -.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_dag.py :language: python :start-after: [START tutorial] :end-before: [END tutorial] @@ -119,7 +119,7 @@ it is all abstracted from the DAG developer. Let's examine this in detail by looking at the Transform task in isolation since it is in the middle of the data pipeline. In Airflow 1.x, this task is defined as shown below: -.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_dag.py :language: python :dedent: 4 :start-after: [START transform_function] @@ -131,7 +131,7 @@ into another XCom variable which will then be used by the Load task. Contrasting that with TaskFlow API in Airflow 2.0 as shown below. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :dedent: 4 :start-after: [START transform] @@ -145,7 +145,7 @@ Similarly, task dependencies are automatically generated within TaskFlows based functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and dependencies specified as shown below. -.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_dag.py :language: python :dedent: 4 :start-after: [START main_flow] @@ -154,7 +154,7 @@ dependencies specified as shown below. In contrast, with the TaskFlow API in Airflow 2.0, the invocation itself automatically generates the dependencies as shown below. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py :language: python :dedent: 4 :start-after: [START main_flow] @@ -233,7 +233,7 @@ image must have a working Python installed and take in a bash command as the ``c Below is an example of using the ``@task.docker`` decorator to run a Python task. -.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py +.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py :language: python :dedent: 4 :start-after: [START transform_docker] @@ -257,7 +257,7 @@ environment on the same machine, you can use the ``@task.virtualenv`` decorator decorator will allow you to create a new virtualenv with custom libraries and even a different Python version to run your function. -.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py +.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py :language: python :dedent: 4 :start-after: [START extract_virtualenv] @@ -471,6 +471,9 @@ Current context is accessible only during the task execution. The context is not What's Next? ------------ -You have seen how simple it is to write DAGs using the TaskFlow API paradigm within Airflow 2.0. Please do -read the :doc:`Concepts section ` for detailed explanation of Airflow concepts such as DAGs, Tasks, -Operators, and more. There's also a whole section on the :doc:`TaskFlow API ` and the ``@task`` decorator. +You have seen how simple it is to write DAGs using the TaskFlow API paradigm within Airflow 2.0. Here are a few steps you might want to take next: + +.. seealso:: + - Continue to the next step of the tutorial: :doc:`/tutorial/pipeline` + - Read the :doc:`Concepts section ` for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more + - View the section on the :doc:`TaskFlow API ` and the ``@task`` decorator. diff --git a/docs/conf.py b/docs/conf.py index cff9a8556504f2..26ad0fca412788 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -693,7 +693,7 @@ def _get_params(root_schema: dict, prefix: str = "", default_section: str = "") '*/migrations/*', '*/contrib/*', '**/example_sla_dag.py', - '**/example_taskflow_api_etl_docker_virtualenv.py', + '**/example_taskflow_api_docker_virtualenv.py', '**/example_dag_decorator.py', ] if PACKAGE_NAME == 'apache-airflow': diff --git a/tests/core/test_example_dags_system.py b/tests/core/test_example_dags_system.py index 4d16f0c24e6d82..618b2e5f4b5eb2 100644 --- a/tests/core/test_example_dags_system.py +++ b/tests/core/test_example_dags_system.py @@ -27,8 +27,7 @@ class TestExampleDagsSystem(SystemTest): [ "example_bash_operator", "example_branch_operator", - "tutorial_etl_dag", - "tutorial_functional_etl_dag", + "tutorial_dag", "example_dag_decorator", ] ) diff --git a/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py b/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py similarity index 94% rename from tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py rename to tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py index 0b4fc5c5ce774a..3d04b80b36f43a 100644 --- a/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py +++ b/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py @@ -30,10 +30,10 @@ DAG_ID = 'docker_taskflow' -def tutorial_taskflow_api_etl_docker_virtualenv(): +def tutorial_taskflow_api_docker_virtualenv(): """ ### TaskFlow API Tutorial Documentation - This is a simple ETL data pipeline example which demonstrates the use of + This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located @@ -111,7 +111,7 @@ def load(total_order_value: float): # break the CI test, we added this try/except here. try: # [START dag_invocation] - tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv() + tutorial_dag = tutorial_taskflow_api_docker_virtualenv() # [END dag_invocation] except AttributeError: pass diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 84003bf3660175..03d0b868f957ce 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -250,8 +250,8 @@ def test_dag_autocomplete_success(client_all_dags): assert resp.json == [ {'name': 'airflow', 'type': 'owner'}, {'name': 'test_mapped_taskflow', 'type': 'dag'}, - {'name': 'tutorial_taskflow_api_etl', 'type': 'dag'}, - {'name': 'tutorial_taskflow_api_etl_virtualenv', 'type': 'dag'}, + {'name': 'tutorial_taskflow_api', 'type': 'dag'}, + {'name': 'tutorial_taskflow_api_virtualenv', 'type': 'dag'}, ]