From b454714b09ea50709f13bb1b21461ee622b87a5b Mon Sep 17 00:00:00 2001 From: Bartlomiej Hirsz Date: Wed, 20 Apr 2022 06:29:25 +0000 Subject: [PATCH] Migrate Datastore system tests to new design (AIP-47) Change-Id: Ibc6f0a03a0c6fb374de85d74a7ac62cf6fa55bec --- .../cloud/example_dags/example_datastore.py | 185 ------------------ .../operators/cloud/datastore.rst | 26 +-- .../datastore/example_datastore_commit.py | 99 ++++++++++ .../example_datastore_export_import.py | 114 +++++++++++ .../datastore/example_datastore_query.py | 84 ++++++++ .../datastore/example_datastore_rollback.py | 67 +++++++ 6 files changed, 377 insertions(+), 198 deletions(-) delete mode 100644 airflow/providers/google/cloud/example_dags/example_datastore.py create mode 100644 tests/system/providers/google/datastore/example_datastore_commit.py create mode 100644 tests/system/providers/google/datastore/example_datastore_export_import.py create mode 100644 tests/system/providers/google/datastore/example_datastore_query.py create mode 100644 tests/system/providers/google/datastore/example_datastore_rollback.py diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py b/airflow/providers/google/cloud/example_dags/example_datastore.py deleted file mode 100644 index d55d7d3c0860c..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_datastore.py +++ /dev/null @@ -1,185 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Example Airflow DAG that shows how to use Datastore operators. - -This example requires that your project contains Datastore instance. -""" - -import os -from datetime import datetime -from typing import Any, Dict - -from airflow import models -from airflow.providers.google.cloud.operators.datastore import ( - CloudDatastoreAllocateIdsOperator, - CloudDatastoreBeginTransactionOperator, - CloudDatastoreCommitOperator, - CloudDatastoreDeleteOperationOperator, - CloudDatastoreExportEntitiesOperator, - CloudDatastoreGetOperationOperator, - CloudDatastoreImportEntitiesOperator, - CloudDatastoreRollbackOperator, - CloudDatastoreRunQueryOperator, -) - -START_DATE = datetime(2021, 1, 1) - -GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test") - -with models.DAG( - "example_gcp_datastore", - schedule_interval='@once', # Override to match your needs - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag: - # [START how_to_export_task] - export_task = CloudDatastoreExportEntitiesOperator( - task_id="export_task", - bucket=BUCKET, - project_id=GCP_PROJECT_ID, - overwrite_existing=True, - ) - # [END how_to_export_task] - - # [START how_to_import_task] - import_task = CloudDatastoreImportEntitiesOperator( - task_id="import_task", - bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}", - file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}", - project_id=GCP_PROJECT_ID, - ) - # [END how_to_import_task] - - export_task >> import_task - -# [START how_to_keys_def] -KEYS = [ - { - "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""}, - "path": {"kind": "airflow"}, - } -] -# [END how_to_keys_def] - -# [START how_to_transaction_def] -TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}} -# [END how_to_transaction_def] - - -with models.DAG( - "example_gcp_datastore_operations", - schedule_interval='@once', # Override to match your needs - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag2: - # [START how_to_allocate_ids] - allocate_ids = CloudDatastoreAllocateIdsOperator( - task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID - ) - # [END how_to_allocate_ids] - - # [START how_to_begin_transaction] - begin_transaction_commit = CloudDatastoreBeginTransactionOperator( - task_id="begin_transaction_commit", - transaction_options=TRANSACTION_OPTIONS, - project_id=GCP_PROJECT_ID, - ) - # [END how_to_begin_transaction] - - # [START how_to_commit_def] - COMMIT_BODY = { - "mode": "TRANSACTIONAL", - "mutations": [ - { - "insert": { - "key": KEYS[0], - "properties": {"string": {"stringValue": "airflow is awesome!"}}, - } - } - ], - "transaction": begin_transaction_commit.output, - } - # [END how_to_commit_def] - - # [START how_to_commit_task] - commit_task = CloudDatastoreCommitOperator( - task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID - ) - # [END how_to_commit_task] - - allocate_ids >> begin_transaction_commit - - begin_transaction_query = CloudDatastoreBeginTransactionOperator( - task_id="begin_transaction_query", - transaction_options=TRANSACTION_OPTIONS, - project_id=GCP_PROJECT_ID, - ) - - # [START how_to_query_def] - QUERY = { - "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": "query"}, - "readOptions": {"transaction": begin_transaction_query.output}, - "query": {}, - } - # [END how_to_query_def] - - # [START how_to_run_query] - run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=GCP_PROJECT_ID) - # [END how_to_run_query] - - allocate_ids >> begin_transaction_query - - begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator( - task_id="begin_transaction_to_rollback", - transaction_options=TRANSACTION_OPTIONS, - project_id=GCP_PROJECT_ID, - ) - - # [START how_to_rollback_transaction] - rollback_transaction = CloudDatastoreRollbackOperator( - task_id="rollback_transaction", - transaction=begin_transaction_to_rollback.output, - ) - # [END how_to_rollback_transaction] - - # Task dependencies created via `XComArgs`: - # begin_transaction_commit >> commit_task - # begin_transaction_to_rollback >> rollback_transaction - # begin_transaction_query >> run_query - - OPERATION_NAME = 'operations/example-operation-unique-id' - # [START get_operation_state] - get_operation = CloudDatastoreGetOperationOperator( - task_id='get_operation', - name=OPERATION_NAME, - gcp_conn_id='google_cloud_default', - ) - # [END get_operation_state] - - # [START delete_operation] - delete_operation = CloudDatastoreDeleteOperationOperator( - task_id='delete_operation', - name=OPERATION_NAME, - gcp_conn_id='google_cloud_default', - ) - # [END delete_operation] diff --git a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst index 7068d1c7a3ef9..4a8e623d6ee14 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst @@ -38,7 +38,7 @@ Export Entities To export entities from Google Cloud Datastore to Cloud Storage use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_export_import.py :language: python :dedent: 4 :start-after: [START how_to_export_task] @@ -52,7 +52,7 @@ Import Entities To import entities from Cloud Storage to Google Cloud Datastore use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_export_import.py :language: python :dedent: 4 :start-after: [START how_to_import_task] @@ -66,7 +66,7 @@ Allocate Ids To allocate IDs for incomplete keys use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreAllocateIdsOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_commit.py :language: python :dedent: 4 :start-after: [START how_to_allocate_ids] @@ -74,7 +74,7 @@ To allocate IDs for incomplete keys use An example of a partial keys required by the operator: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_commit.py :language: python :dedent: 0 :start-after: [START how_to_keys_def] @@ -88,7 +88,7 @@ Begin transaction To begin a new transaction use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreBeginTransactionOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_commit.py :language: python :dedent: 4 :start-after: [START how_to_begin_transaction] @@ -96,7 +96,7 @@ To begin a new transaction use An example of a transaction options required by the operator: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_commit.py :language: python :dedent: 0 :start-after: [START how_to_transaction_def] @@ -110,7 +110,7 @@ Commit transaction To commit a transaction, optionally creating, deleting or modifying some entities use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCommitOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_commit.py :language: python :dedent: 4 :start-after: [START how_to_commit_task] @@ -118,7 +118,7 @@ use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCo An example of a commit information required by the operator: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_commit.py :language: python :dedent: 0 :start-after: [START how_to_commit_def] @@ -132,7 +132,7 @@ Run query To run a query for entities use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRunQueryOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_query.py :language: python :dedent: 4 :start-after: [START how_to_run_query] @@ -140,7 +140,7 @@ To run a query for entities use An example of a query required by the operator: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_query.py :language: python :dedent: 0 :start-after: [START how_to_query_def] @@ -154,7 +154,7 @@ Roll back transaction To roll back a transaction use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRollbackOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_rollback.py :language: python :dedent: 4 :start-after: [START how_to_rollback_transaction] @@ -168,7 +168,7 @@ Get operation state To get the current state of a long-running operation use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_export_import.py :language: python :dedent: 4 :start-after: [START get_operation_state] @@ -182,7 +182,7 @@ Delete operation To delete an operation use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datastore.py +.. exampleinclude:: /../../tests/system/providers/google/datastore/example_datastore_export_import.py :language: python :dedent: 4 :start-after: [START delete_operation] diff --git a/tests/system/providers/google/datastore/example_datastore_commit.py b/tests/system/providers/google/datastore/example_datastore_commit.py new file mode 100644 index 0000000000000..c00847fa2c599 --- /dev/null +++ b/tests/system/providers/google/datastore/example_datastore_commit.py @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Airflow System Test DAG that verifies Datastore commit operators. +""" + +import os +from datetime import datetime +from typing import Any, Dict + +from airflow import models +from airflow.providers.google.cloud.operators.datastore import ( + CloudDatastoreAllocateIdsOperator, + CloudDatastoreBeginTransactionOperator, + CloudDatastoreCommitOperator, +) + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "datastore_commit" + +# [START how_to_keys_def] +KEYS = [ + { + "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""}, + "path": {"kind": "airflow"}, + } +] +# [END how_to_keys_def] + +# [START how_to_transaction_def] +TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}} +# [END how_to_transaction_def] + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["datastore", "example"], +) as dag: + # [START how_to_allocate_ids] + allocate_ids = CloudDatastoreAllocateIdsOperator( + task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID + ) + # [END how_to_allocate_ids] + + # [START how_to_begin_transaction] + begin_transaction_commit = CloudDatastoreBeginTransactionOperator( + task_id="begin_transaction_commit", + transaction_options=TRANSACTION_OPTIONS, + project_id=PROJECT_ID, + ) + # [END how_to_begin_transaction] + + # [START how_to_commit_def] + COMMIT_BODY = { + "mode": "TRANSACTIONAL", + "mutations": [ + { + "insert": { + "key": KEYS[0], + "properties": {"string": {"stringValue": "airflow is awesome!"}}, + } + } + ], + "transaction": begin_transaction_commit.output, + } + # [END how_to_commit_def] + + # [START how_to_commit_task] + commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID) + # [END how_to_commit_task] + + allocate_ids >> begin_transaction_commit >> commit_task + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/datastore/example_datastore_export_import.py b/tests/system/providers/google/datastore/example_datastore_export_import.py new file mode 100644 index 0000000000000..ab791b8c3b23d --- /dev/null +++ b/tests/system/providers/google/datastore/example_datastore_export_import.py @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Airflow System Test DAG that verifies Datastore export and import operators. +""" + +import os +from datetime import datetime + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.providers.google.cloud.operators.datastore import ( + CloudDatastoreDeleteOperationOperator, + CloudDatastoreExportEntitiesOperator, + CloudDatastoreGetOperationOperator, + CloudDatastoreImportEntitiesOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "datastore_export_import" +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["datastore", "example"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID, location="EU" + ) + + # [START how_to_export_task] + export_task = CloudDatastoreExportEntitiesOperator( + task_id="export_task", + bucket=BUCKET_NAME, + project_id=PROJECT_ID, + overwrite_existing=True, + ) + # [END how_to_export_task] + + # [START how_to_import_task] + import_task = CloudDatastoreImportEntitiesOperator( + task_id="import_task", + bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}", + file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}", + project_id=PROJECT_ID, + ) + # [END how_to_import_task] + + # [START get_operation_state] + get_operation = CloudDatastoreGetOperationOperator( + task_id='get_operation', name="{{ task_instance.xcom_pull('export_task')['name'] }}" + ) + # [END get_operation_state] + + # [START delete_operation] + delete_export_operation = CloudDatastoreDeleteOperationOperator( + task_id='delete_export_operation', + name="{{ task_instance.xcom_pull('export_task')['name'] }}", + ) + # [END delete_operation] + delete_export_operation.trigger_rule = TriggerRule.ALL_DONE + + delete_import_operation = CloudDatastoreDeleteOperationOperator( + task_id='delete_import_operation', + name="{{ task_instance.xcom_pull('export_task')['name'] }}", + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + chain( + create_bucket, + export_task, + import_task, + get_operation, + [delete_bucket, delete_export_operation, delete_import_operation], + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/datastore/example_datastore_query.py b/tests/system/providers/google/datastore/example_datastore_query.py new file mode 100644 index 0000000000000..d42255034f55a --- /dev/null +++ b/tests/system/providers/google/datastore/example_datastore_query.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Airflow System Test DAG that verifies Datastore query operators. +""" + +import os +from datetime import datetime +from typing import Any, Dict + +from airflow import models +from airflow.providers.google.cloud.operators.datastore import ( + CloudDatastoreAllocateIdsOperator, + CloudDatastoreBeginTransactionOperator, + CloudDatastoreRunQueryOperator, +) + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "datastore_query" + +KEYS = [ + { + "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""}, + "path": {"kind": "airflow"}, + } +] + +TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}} + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["datastore", "example"], +) as dag: + allocate_ids = CloudDatastoreAllocateIdsOperator( + task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID + ) + + begin_transaction_query = CloudDatastoreBeginTransactionOperator( + task_id="begin_transaction_query", + transaction_options=TRANSACTION_OPTIONS, + project_id=PROJECT_ID, + ) + + # [START how_to_query_def] + QUERY = { + "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"}, + "readOptions": {"transaction": begin_transaction_query.output}, + "query": {}, + } + # [END how_to_query_def] + + # [START how_to_run_query] + run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID) + # [END how_to_run_query] + + allocate_ids >> begin_transaction_query >> run_query + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/datastore/example_datastore_rollback.py b/tests/system/providers/google/datastore/example_datastore_rollback.py new file mode 100644 index 0000000000000..2fc102f6bb6d8 --- /dev/null +++ b/tests/system/providers/google/datastore/example_datastore_rollback.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Airflow System Test DAG that verifies Datastore rollback operators. +""" + +import os +from datetime import datetime +from typing import Any, Dict + +from airflow import models +from airflow.providers.google.cloud.operators.datastore import ( + CloudDatastoreBeginTransactionOperator, + CloudDatastoreRollbackOperator, +) + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "datastore_rollback" + +TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}} + + +with models.DAG( + DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["datastore", "example"], +) as dag: + begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator( + task_id="begin_transaction_to_rollback", + transaction_options=TRANSACTION_OPTIONS, + project_id=PROJECT_ID, + ) + + # [START how_to_rollback_transaction] + rollback_transaction = CloudDatastoreRollbackOperator( + task_id="rollback_transaction", + transaction=begin_transaction_to_rollback.output, + ) + # [END how_to_rollback_transaction] + + begin_transaction_to_rollback >> rollback_transaction + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)