From 7ad4e67c1ad504f6338c1f616fa4245685cf1abd Mon Sep 17 00:00:00 2001 From: chethanuk-plutoflume Date: Fri, 3 Jun 2022 17:11:41 +0100 Subject: [PATCH] Migrate Cncf.Kubernetes example DAGs to new design #22441 (#24132) * Migrate Cncf.Kubernetes example DAGs to new design #22441 --- .../cncf/kubernetes/example_dags/__init__.py | 17 ------- .../index.rst | 2 +- .../operators.rst | 6 +-- .../cncf/kubernetes}/example_kubernetes.py | 16 +++++- .../kubernetes}/example_spark_kubernetes.py | 51 ++++++++++++------- .../example_spark_kubernetes_spark_pi.yaml | 0 6 files changed, 53 insertions(+), 39 deletions(-) delete mode 100644 airflow/providers/cncf/kubernetes/example_dags/__init__.py rename {airflow/providers/cncf/kubernetes/example_dags => tests/system/providers/cncf/kubernetes}/example_kubernetes.py (91%) rename {airflow/providers/cncf/kubernetes/example_dags => tests/system/providers/cncf/kubernetes}/example_spark_kubernetes.py (64%) rename {airflow/providers/cncf/kubernetes/example_dags => tests/system/providers/cncf/kubernetes}/example_spark_kubernetes_spark_pi.yaml (100%) diff --git a/airflow/providers/cncf/kubernetes/example_dags/__init__.py b/airflow/providers/cncf/kubernetes/example_dags/__init__.py deleted file mode 100644 index 217e5db9607827..00000000000000 --- a/airflow/providers/cncf/kubernetes/example_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/docs/apache-airflow-providers-cncf-kubernetes/index.rst b/docs/apache-airflow-providers-cncf-kubernetes/index.rst index 57ddb81ae69656..a5f6ec3c6d50d5 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/index.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/index.rst @@ -39,7 +39,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst index 92e067c51d6b4c..1776ea295633bc 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst @@ -89,7 +89,7 @@ Using this method will ensure correctness and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the :class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables. -.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py :language: python :start-after: [START howto_operator_k8s_cluster_resources] :end-before: [END howto_operator_k8s_cluster_resources] @@ -122,7 +122,7 @@ Create the Secret using ``kubectl``: Then use it in your pod like so: -.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py :language: python :start-after: [START howto_operator_k8s_private_image] :end-before: [END howto_operator_k8s_private_image] @@ -136,7 +136,7 @@ alongside the Pod. The Pod must write the XCom value into this location at the ` See the following example on how this occurs: -.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py :language: python :start-after: [START howto_operator_k8s_write_xcom] :end-before: [END howto_operator_k8s_write_xcom] diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/tests/system/providers/cncf/kubernetes/example_kubernetes.py similarity index 91% rename from airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py rename to tests/system/providers/cncf/kubernetes/example_kubernetes.py index b65dae9f4e52af..d77d0889d6aacb 100644 --- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +++ b/tests/system/providers/cncf/kubernetes/example_kubernetes.py @@ -19,6 +19,7 @@ This is an example dag for using the KubernetesPodOperator. """ +import os from datetime import datetime from kubernetes.client import models as k8s @@ -97,6 +98,8 @@ # [END howto_operator_k8s_cluster_resources] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_kubernetes_operator" with DAG( dag_id='example_kubernetes_operator', @@ -158,6 +161,17 @@ bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) - # [END howto_operator_k8s_write_xcom] write_xcom >> pod_task_xcom_result + # [END howto_operator_k8s_write_xcom] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py b/tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py similarity index 64% rename from airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py rename to tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py index d01d4b1328c684..20cba7a9f1f7ec 100644 --- a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py +++ b/tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py @@ -25,6 +25,7 @@ https://github.com/GoogleCloudPlatform/spark-on-k8s-operator """ +import os from datetime import datetime, timedelta # [START import_module] @@ -40,27 +41,43 @@ # [START instantiate_dag] -dag = DAG( - 'spark_pi', + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "spark_pi" + +with DAG( + DAG_ID, default_args={'max_active_runs': 1}, description='submit spark-pi as sparkApplication on kubernetes', schedule_interval=timedelta(days=1), start_date=datetime(2021, 1, 1), catchup=False, -) +) as dag: + # [START SparkKubernetesOperator_DAG] + t1 = SparkKubernetesOperator( + task_id='spark_pi_submit', + namespace="default", + application_file="example_spark_kubernetes_spark_pi.yaml", + do_xcom_push=True, + dag=dag, + ) + + t2 = SparkKubernetesSensor( + task_id='spark_pi_monitor', + namespace="default", + application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", + dag=dag, + ) + t1 >> t2 + + # [END SparkKubernetesOperator_DAG] + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() -t1 = SparkKubernetesOperator( - task_id='spark_pi_submit', - namespace="default", - application_file="example_spark_kubernetes_spark_pi.yaml", - do_xcom_push=True, - dag=dag, -) +from tests.system.utils import get_test_run # noqa: E402 -t2 = SparkKubernetesSensor( - task_id='spark_pi_monitor', - namespace="default", - application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", - dag=dag, -) -t1 >> t2 +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes_spark_pi.yaml b/tests/system/providers/cncf/kubernetes/example_spark_kubernetes_spark_pi.yaml similarity index 100% rename from airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes_spark_pi.yaml rename to tests/system/providers/cncf/kubernetes/example_spark_kubernetes_spark_pi.yaml