diff --git a/airflow/providers/google/cloud/links/workflows.py b/airflow/providers/google/cloud/links/workflows.py new file mode 100644 index 0000000000000..db8022c1309ff --- /dev/null +++ b/airflow/providers/google/cloud/links/workflows.py @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Workflows links.""" +from typing import TYPE_CHECKING, Optional + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.links.base import BaseGoogleLink + +if TYPE_CHECKING: + from airflow.utils.context import Context + +WORKFLOWS_BASE_LINK = "https://console.cloud.google.com/workflows" +WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}" +WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}" +EXECUTION_LINK = ( + WORKFLOWS_BASE_LINK + + "/workflow/{location_id}/{workflow_id}/execution/{execution_id}?project={project_id}" +) + + +class WorkflowsWorkflowDetailsLink(BaseGoogleLink): + """Helper class for constructing Workflow details Link""" + + name = "Workflow details" + key = "workflow_details" + format_str = WORKFLOW_LINK + + @staticmethod + def persist( + context: "Context", + task_instance: BaseOperator, + location_id: str, + workflow_id: str, + project_id: Optional[str], + ): + task_instance.xcom_push( + context, + key=WorkflowsWorkflowDetailsLink.key, + value={"location_id": location_id, "workflow_id": workflow_id, "project_id": project_id}, + ) + + +class WorkflowsListOfWorkflowsLink(BaseGoogleLink): + """Helper class for constructing list of Workflows Link""" + + name = "List of workflows" + key = "list_of_workflows" + format_str = WORKFLOWS_LINK + + @staticmethod + def persist( + context: "Context", + task_instance: BaseOperator, + project_id: Optional[str], + ): + task_instance.xcom_push( + context, + key=WorkflowsListOfWorkflowsLink.key, + value={"project_id": project_id}, + ) + + +class WorkflowsExecutionLink(BaseGoogleLink): + """Helper class for constructing Workflows Execution Link""" + + name = "Workflow Execution" + key = "workflow_execution" + format_str = EXECUTION_LINK + + @staticmethod + def persist( + context: "Context", + task_instance: BaseOperator, + location_id: str, + workflow_id: str, + execution_id: str, + project_id: Optional[str], + ): + task_instance.xcom_push( + context, + key=WorkflowsExecutionLink.key, + value={ + "location_id": location_id, + "workflow_id": workflow_id, + "execution_id": execution_id, + "project_id": project_id, + }, + ) diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py index 157d34a057e05..e1cb45659a8cd 100644 --- a/airflow/providers/google/cloud/operators/workflows.py +++ b/airflow/providers/google/cloud/operators/workflows.py @@ -31,6 +31,11 @@ from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook +from airflow.providers.google.cloud.links.workflows import ( + WorkflowsExecutionLink, + WorkflowsListOfWorkflowsLink, + WorkflowsWorkflowDetailsLink, +) if TYPE_CHECKING: from airflow.utils.context import Context @@ -60,6 +65,7 @@ class WorkflowsCreateWorkflowOperator(BaseOperator): template_fields: Sequence[str] = ("location", "workflow", "workflow_id") template_fields_renderers = {"workflow": "json"} + operator_extra_links = (WorkflowsWorkflowDetailsLink(),) def __init__( self, @@ -132,6 +138,15 @@ def execute(self, context: 'Context'): timeout=self.timeout, metadata=self.metadata, ) + + WorkflowsWorkflowDetailsLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + project_id=self.project_id or hook.project_id, + ) + return Workflow.to_dict(workflow) @@ -162,6 +177,7 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator): template_fields: Sequence[str] = ("workflow_id", "update_mask") template_fields_renderers = {"update_mask": "json"} + operator_extra_links = (WorkflowsWorkflowDetailsLink(),) def __init__( self, @@ -209,6 +225,15 @@ def execute(self, context: 'Context'): metadata=self.metadata, ) workflow = operation.result() + + WorkflowsWorkflowDetailsLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + project_id=self.project_id or hook.project_id, + ) + return Workflow.to_dict(workflow) @@ -296,6 +321,7 @@ class WorkflowsListWorkflowsOperator(BaseOperator): """ template_fields: Sequence[str] = ("location", "order_by", "filter_") + operator_extra_links = (WorkflowsListOfWorkflowsLink(),) def __init__( self, @@ -335,6 +361,13 @@ def execute(self, context: 'Context'): timeout=self.timeout, metadata=self.metadata, ) + + WorkflowsListOfWorkflowsLink.persist( + context=context, + task_instance=self, + project_id=self.project_id or hook.project_id, + ) + return [Workflow.to_dict(w) for w in workflows_iter] @@ -357,6 +390,7 @@ class WorkflowsGetWorkflowOperator(BaseOperator): """ template_fields: Sequence[str] = ("location", "workflow_id") + operator_extra_links = (WorkflowsWorkflowDetailsLink(),) def __init__( self, @@ -393,6 +427,15 @@ def execute(self, context: 'Context'): timeout=self.timeout, metadata=self.metadata, ) + + WorkflowsWorkflowDetailsLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + project_id=self.project_id or hook.project_id, + ) + return Workflow.to_dict(workflow) @@ -418,6 +461,7 @@ class WorkflowsCreateExecutionOperator(BaseOperator): template_fields: Sequence[str] = ("location", "workflow_id", "execution") template_fields_renderers = {"execution": "json"} + operator_extra_links = (WorkflowsExecutionLink(),) def __init__( self, @@ -459,6 +503,16 @@ def execute(self, context: 'Context'): ) execution_id = execution.name.split("/")[-1] self.xcom_push(context, key="execution_id", value=execution_id) + + WorkflowsExecutionLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + execution_id=execution_id, + project_id=self.project_id or hook.project_id, + ) + return Execution.to_dict(execution) @@ -482,6 +536,7 @@ class WorkflowsCancelExecutionOperator(BaseOperator): """ template_fields: Sequence[str] = ("location", "workflow_id", "execution_id") + operator_extra_links = (WorkflowsExecutionLink(),) def __init__( self, @@ -521,6 +576,16 @@ def execute(self, context: 'Context'): timeout=self.timeout, metadata=self.metadata, ) + + WorkflowsExecutionLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + execution_id=self.execution_id, + project_id=self.project_id or hook.project_id, + ) + return Execution.to_dict(execution) @@ -549,6 +614,7 @@ class WorkflowsListExecutionsOperator(BaseOperator): """ template_fields: Sequence[str] = ("location", "workflow_id") + operator_extra_links = (WorkflowsWorkflowDetailsLink(),) def __init__( self, @@ -588,6 +654,14 @@ def execute(self, context: 'Context'): metadata=self.metadata, ) + WorkflowsWorkflowDetailsLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + project_id=self.project_id or hook.project_id, + ) + return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter] @@ -611,6 +685,7 @@ class WorkflowsGetExecutionOperator(BaseOperator): """ template_fields: Sequence[str] = ("location", "workflow_id", "execution_id") + operator_extra_links = (WorkflowsExecutionLink(),) def __init__( self, @@ -650,4 +725,14 @@ def execute(self, context: 'Context'): timeout=self.timeout, metadata=self.metadata, ) + + WorkflowsExecutionLink.persist( + context=context, + task_instance=self, + location_id=self.location, + workflow_id=self.workflow_id, + execution_id=self.execution_id, + project_id=self.project_id or hook.project_id, + ) + return Execution.to_dict(execution) diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py index 8560fd3ee2559..58fa872a0dea0 100644 --- a/airflow/providers/google/cloud/sensors/workflows.py +++ b/airflow/providers/google/cloud/sensors/workflows.py @@ -56,7 +56,7 @@ def __init__( workflow_id: str, execution_id: str, location: str, - project_id: str, + project_id: Optional[str] = None, success_states: Optional[Set[Execution.State]] = None, failure_states: Optional[Set[Execution.State]] = None, retry: Union[Retry, _MethodDefault] = DEFAULT, diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index e61f2dfc3c482..e693935145ec0 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -908,6 +908,9 @@ extra-links: - airflow.providers.google.cloud.links.vertex_ai.VertexAIBatchPredictionJobListLink - airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointLink - airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointListLink + - airflow.providers.google.cloud.links.workflows.WorkflowsWorkflowDetailsLink + - airflow.providers.google.cloud.links.workflows.WorkflowsListOfWorkflowsLink + - airflow.providers.google.cloud.links.workflows.WorkflowsExecutionLink - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink - airflow.providers.google.cloud.links.dataflow.DataflowJobLink diff --git a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst index 09685491f2840..0cea43f0f79bd 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst @@ -39,7 +39,7 @@ Create workflow To create a workflow use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateWorkflowOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_create_workflow] @@ -47,7 +47,7 @@ To create a workflow use The workflow should be define in similar why to this example: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 0 :start-after: [START how_to_define_workflow] @@ -65,7 +65,7 @@ Update workflow To update a workflow use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsUpdateWorkflowOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_update_workflow] @@ -79,7 +79,7 @@ Get workflow To get a workflow use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetWorkflowOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_get_workflow] @@ -93,7 +93,7 @@ List workflows To list workflows use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListWorkflowsOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_list_workflows] @@ -107,7 +107,7 @@ Delete workflow To delete a workflow use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsDeleteWorkflowOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_delete_workflow] @@ -122,7 +122,7 @@ To create an execution use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateExecutionOperator`. This operator is not idempotent due to API limitation. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_create_execution] @@ -131,7 +131,7 @@ This operator is not idempotent due to API limitation. The create operator does not wait for execution to complete. To wait for execution result use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowExecutionSensor`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_wait_for_execution] @@ -145,7 +145,7 @@ Get execution To get an execution use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetExecutionOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_get_execution] @@ -160,7 +160,7 @@ To list executions use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListExecutionsOperator`. By default this operator will return only executions for last 60 minutes. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_list_executions] @@ -174,7 +174,7 @@ Cancel execution To cancel an execution use :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCancelExecutionOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py +.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py :language: python :dedent: 4 :start-after: [START how_to_cancel_execution] diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py index 5578548ffb40e..0ba4a6429823f 100644 --- a/tests/providers/google/cloud/operators/test_workflows.py +++ b/tests/providers/google/cloud/operators/test_workflows.py @@ -64,7 +64,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -100,7 +101,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -181,7 +183,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -216,7 +219,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -253,7 +257,8 @@ def test_execute(self, mock_xcom, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -269,7 +274,16 @@ def test_execute(self, mock_xcom, mock_hook, mock_object): timeout=TIMEOUT, metadata=METADATA, ) - mock_xcom.assert_called_once_with({}, key="execution_id", value="execution_id") + mock_xcom.assert_called_with( + context, + key="workflow_execution", + value={ + 'location_id': LOCATION, + 'workflow_id': WORKFLOW_ID, + 'execution_id': EXECUTION_ID, + 'project_id': PROJECT_ID, + }, + ) assert result == mock_object.to_dict.return_value @@ -289,7 +303,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -328,7 +343,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, @@ -363,7 +379,8 @@ def test_execute(self, mock_hook, mock_object): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute({}) + context = mock.MagicMock() + result = op.execute(context=context) mock_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, diff --git a/tests/providers/google/cloud/operators/test_workflows_system.py b/tests/providers/google/cloud/operators/test_workflows_system.py deleted file mode 100644 index 300552b048c64..0000000000000 --- a/tests/providers/google/cloud/operators/test_workflows_system.py +++ /dev/null @@ -1,35 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import pytest - -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_WORKFLOWS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.system("google.cloud") -@pytest.mark.credential_file(GCP_WORKFLOWS_KEY) -class WorkflowsExampleDagsSystemTest(GoogleSystemTest): - def setUp(self): - super().setUp() - - @provide_gcp_context(GCP_WORKFLOWS_KEY) - def test_run_example_workflow_dag(self): - self.run_dag('example_cloud_workflows', CLOUD_DAG_FOLDER) - - def tearDown(self): - super().tearDown() diff --git a/airflow/providers/google/cloud/example_dags/example_workflows.py b/tests/system/providers/google/workflows/example_workflows.py similarity index 78% rename from airflow/providers/google/cloud/example_dags/example_workflows.py rename to tests/system/providers/google/workflows/example_workflows.py index e2ca88cdf35c4..47ca818bf830e 100644 --- a/airflow/providers/google/cloud/example_dags/example_workflows.py +++ b/tests/system/providers/google/workflows/example_workflows.py @@ -33,11 +33,15 @@ WorkflowsUpdateWorkflowOperator, ) from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor +from airflow.utils.trigger_rule import TriggerRule -LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1") -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") -WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow") +DAG_ID = "cloud_workflows" + +LOCATION = "us-central1" +WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}" # [START how_to_define_workflow] WORKFLOW_CONTENT = """ @@ -67,7 +71,7 @@ EXECUTION = {"argument": ""} -SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow") +SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}" SLEEP_WORKFLOW_CONTENT = """ - someSleep: call: sys.sleep @@ -83,7 +87,7 @@ with DAG( - "example_cloud_workflows", + DAG_ID, schedule_interval='@once', start_date=datetime(2021, 1, 1), catchup=False, @@ -99,8 +103,8 @@ # [END how_to_create_workflow] # [START how_to_update_workflow] - update_workflows = WorkflowsUpdateWorkflowOperator( - task_id="update_workflows", + update_workflow = WorkflowsUpdateWorkflowOperator( + task_id="update_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID, @@ -127,6 +131,7 @@ task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID ) # [END how_to_delete_workflow] + delete_workflow.trigger_rule = TriggerRule.ALL_DONE # [START how_to_create_execution] create_execution = WorkflowsCreateExecutionOperator( @@ -182,21 +187,36 @@ workflow_id=SLEEP_WORKFLOW_ID, ) + cancel_execution_id = create_execution_for_cancel.output["execution_id"] + # [START how_to_cancel_execution] cancel_execution = WorkflowsCancelExecutionOperator( task_id="cancel_execution", location=LOCATION, project_id=PROJECT_ID, workflow_id=SLEEP_WORKFLOW_ID, - execution_id=create_execution_id, + execution_id=cancel_execution_id, ) # [END how_to_cancel_execution] - create_workflow >> update_workflows >> [get_workflow, list_workflows] - update_workflows >> [create_execution, create_execution_for_cancel] + delete_workflow_for_cancel = WorkflowsDeleteWorkflowOperator( + task_id="delete_workflow_for_cancel", + location=LOCATION, + project_id=PROJECT_ID, + workflow_id=SLEEP_WORKFLOW_ID, + trigger_rule=TriggerRule.ALL_DONE, + ) + + create_workflow >> update_workflow >> [get_workflow, list_workflows] + update_workflow >> [create_execution, create_execution_for_cancel] wait_for_execution >> [get_execution, list_executions] - create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution + ( + create_workflow_for_cancel + >> create_execution_for_cancel + >> cancel_execution + >> delete_workflow_for_cancel + ) [cancel_execution, list_executions] >> delete_workflow @@ -205,7 +225,16 @@ # create_execution >> get_execution # create_execution >> cancel_execution + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 -if __name__ == '__main__': - dag.clear(dag_run_state=None) - dag.run() +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)