Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Cncf.Kubernetes example DAGs to new design #22441 #24132

Merged
merged 2 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions airflow/providers/cncf/kubernetes/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-cncf-kubernetes/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/cncf/kubernetes/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/cncf/kubernetes>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/>
Installing from sources <installing-providers-from-sources>

Expand Down
6 changes: 3 additions & 3 deletions docs/apache-airflow-providers-cncf-kubernetes/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Using this method will ensure correctness
and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.

.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py
:language: python
:start-after: [START howto_operator_k8s_cluster_resources]
:end-before: [END howto_operator_k8s_cluster_resources]
Expand Down Expand Up @@ -122,7 +122,7 @@ Create the Secret using ``kubectl``:

Then use it in your pod like so:

.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py
:language: python
:start-after: [START howto_operator_k8s_private_image]
:end-before: [END howto_operator_k8s_private_image]
Expand All @@ -136,7 +136,7 @@ alongside the Pod. The Pod must write the XCom value into this location at the `

See the following example on how this occurs:

.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py
:language: python
:start-after: [START howto_operator_k8s_write_xcom]
:end-before: [END howto_operator_k8s_write_xcom]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
This is an example dag for using the KubernetesPodOperator.
"""

import os
from datetime import datetime

from kubernetes.client import models as k8s
Expand Down Expand Up @@ -97,6 +98,8 @@

# [END howto_operator_k8s_cluster_resources]

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_kubernetes_operator"

with DAG(
dag_id='example_kubernetes_operator',
Expand Down Expand Up @@ -158,6 +161,17 @@
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
# [END howto_operator_k8s_write_xcom]

write_xcom >> pod_task_xcom_result
# [END howto_operator_k8s_write_xcom]

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
"""

import os
from datetime import datetime, timedelta

# [START import_module]
Expand All @@ -40,27 +41,43 @@

# [START instantiate_dag]

dag = DAG(
'spark_pi',

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "spark_pi"

with DAG(
DAG_ID,
default_args={'max_active_runs': 1},
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
)
) as dag:
# [START SparkKubernetesOperator_DAG]
t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="default",
application_file="example_spark_kubernetes_spark_pi.yaml",
do_xcom_push=True,
dag=dag,
)

t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="default",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
dag=dag,
)
t1 >> t2

# [END SparkKubernetesOperator_DAG]
from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="default",
application_file="example_spark_kubernetes_spark_pi.yaml",
do_xcom_push=True,
dag=dag,
)
from tests.system.utils import get_test_run # noqa: E402

t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="default",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
dag=dag,
)
t1 >> t2
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)