Skip to content

Commit

Permalink
Fix dataform and datastore system tests (#40295)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova authored Jun 18, 2024
1 parent 9208e11 commit 3e4ed12
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
22 changes: 14 additions & 8 deletions tests/system/providers/google/cloud/dataform/example_dataform.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
from airflow.providers.google.cloud.utils.dataform import make_initialization_workspace_flow
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

DAG_ID = "example_dataform"
DAG_ID = "dataform"

REPOSITORY_ID = f"example_dataform_repository_{ENV_ID}"
REGION = "us-central1"
Expand Down Expand Up @@ -281,23 +282,27 @@
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_delete_workspace]

delete_workspace.trigger_rule = TriggerRule.ALL_DONE

# [START howto_operator_delete_repository]
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_delete_repository]

delete_repository.trigger_rule = TriggerRule.ALL_DONE

(make_repository >> make_workspace >> first_initialization_step)
(
# TEST SETUP
make_repository
>> make_workspace
# TEST BODY
>> first_initialization_step
)
(
last_initialization_step
>> install_npm_packages
Expand All @@ -312,6 +317,7 @@
>> cancel_workflow_invocation
>> make_test_directory
>> write_test_file
# TEST TEARDOWN
>> remove_test_file
>> remove_test_directory
>> delete_dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,26 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["datastore", "example"],
tags=["example", "datastore"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID, location="EU"
)

# [START how_to_allocate_ids]
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
# [END how_to_allocate_ids]

# [START how_to_begin_transaction]
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
transaction_options=TRANSACTION_OPTIONS,
project_id=PROJECT_ID,
)
# [END how_to_begin_transaction]

# [START how_to_commit_def]
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
Expand All @@ -95,9 +98,11 @@
"singleUseTransaction": {"readWrite": {}},
}
# [END how_to_commit_def]

# [START how_to_commit_task]
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
# [END how_to_commit_task]

# [START how_to_export_task]
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
Expand All @@ -106,6 +111,7 @@
overwrite_existing=True,
)
# [END how_to_export_task]

# [START how_to_import_task]
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
Expand All @@ -114,35 +120,42 @@
project_id=PROJECT_ID,
)
# [END how_to_import_task]

# [START get_operation_state]
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
# [END get_operation_state]

# [START delete_operation]
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)
# [END delete_operation]
delete_export_operation.trigger_rule = TriggerRule.ALL_DONE

delete_import_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_import_operation",
name="{{ task_instance.xcom_pull('import_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

chain(
# TEST SETUP
create_bucket,
# TEST BODY
allocate_ids,
begin_transaction_commit,
commit_task,
export_task,
import_task,
get_operation,
# TEST TEARDOWN
[delete_bucket, delete_export_operation, delete_import_operation],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

DAG_ID = "datastore_query"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

DAG_ID = "datastore_rollback"
Expand Down

0 comments on commit 3e4ed12

Please sign in to comment.