Skip to content

Commit

Permalink
Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)
Browse files Browse the repository at this point in the history
related: #22447, #22430
  • Loading branch information
chenglongyan authored Jul 21, 2022
1 parent 911db9f commit 66b3ca1
Showing 1 changed file with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@
)
from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
DAG_ID = "example_bigquery_to_mssql"

DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "INVALID BUCKET NAME")
TABLE = "table_42"
destination_table = "mssql_table_test"

with models.DAG(
"example_bigquery_to_mssql",
schedule_interval=None, # Override to match your needs
DAG_ID,
schedule_interval="@once", # Override to match your needs
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "bigquery"],
) as dag:
bigquery_to_mssql = BigQueryToMsSqlOperator(
task_id="bigquery_to_mssql",
Expand All @@ -61,10 +64,28 @@
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
create_dataset >> create_table >> bigquery_to_mssql

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

bigquery_to_mssql >> delete_dataset
(
# TEST SETUP
create_dataset
>> create_table
# TEST BODY
>> bigquery_to_mssql
# TEST TEARDOWN
>> delete_dataset
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 66b3ca1

Please sign in to comment.