From 945b20c65adbab4298c8fa0f8eef9951717a4b66 Mon Sep 17 00:00:00 2001 From: Chethan UK Date: Sun, 5 Jun 2022 23:17:34 +0100 Subject: [PATCH] AIP-47 - Migrate google marketing DAGs to new design #22447 --- .../marketing_platform/analytics.rst | 6 +- .../marketing_platform/campaign_manager.rst | 14 +- .../marketing_platform/display_video.rst | 20 +-- .../operators/marketing_platform/index.rst | 2 +- .../marketing_platform/search_ads.rst | 8 +- .../operators/test_analytics_system.py | 2 +- .../operators/test_campaign_manager_system.py | 2 +- .../operators/test_display_video_system.py | 2 +- .../operators/test_search_ads_system.py | 2 +- .../system/providers/google}/__init__.py | 1 + .../google/marketing_platform/__init__.py | 17 +++ .../marketing_platform}/example_analytics.py | 19 ++- .../example_campaign_manager.py | 18 ++- .../example_display_video.py | 140 ++++++++++++++++++ .../example_display_video_misc.py | 117 +++++++++++++++ .../example_display_video_sdf.py | 104 +++---------- .../marketing_platform}/example_search_ads.py | 22 ++- tests/test_utils/gcp_system_helpers.py | 2 +- 18 files changed, 378 insertions(+), 120 deletions(-) rename {airflow/providers/google/marketing_platform/example_dags => tests/system/providers/google}/__init__.py (99%) create mode 100644 tests/system/providers/google/marketing_platform/__init__.py rename {airflow/providers/google/marketing_platform/example_dags => tests/system/providers/google/marketing_platform}/example_analytics.py (84%) rename {airflow/providers/google/marketing_platform/example_dags => tests/system/providers/google/marketing_platform}/example_campaign_manager.py (91%) create mode 100644 tests/system/providers/google/marketing_platform/example_display_video.py create mode 100644 tests/system/providers/google/marketing_platform/example_display_video_misc.py rename airflow/providers/google/marketing_platform/example_dags/example_display_video.py => tests/system/providers/google/marketing_platform/example_display_video_sdf.py (60%) rename {airflow/providers/google/marketing_platform/example_dags => tests/system/providers/google/marketing_platform}/example_search_ads.py (80%) diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst index 706ddd113074a3..fd06a85105d14a 100644 --- a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst @@ -35,7 +35,7 @@ List the Accounts To list accounts from Analytics you can use the :class:`~airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsListAccountsOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_analytics.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics.py :language: python :dedent: 4 :start-after: [START howto_marketing_platform_list_accounts_operator] @@ -53,7 +53,7 @@ Returns a web property-Google Ads link to which the user has access. To list web property-Google Ads link you can use the :class:`~airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsGetAdsLinkOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_analytics.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics.py :language: python :dedent: 4 :start-after: [START howto_marketing_platform_get_ads_link_operator] @@ -71,7 +71,7 @@ Operator returns a list of entity Google Ads links. To list Google Ads links you can use the :class:`~airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsRetrieveAdsLinksListOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_analytics.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics.py :language: python :dedent: 4 :start-after: [START howto_marketing_platform_retrieve_ads_links_list_operator] diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst index cbcf5706f5103a..4305962399d349 100644 --- a/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst @@ -36,7 +36,7 @@ To delete Campaign Manager report you can use the :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerDeleteReportOperator`. It deletes a report by its unique ID. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_delete_report_operator] @@ -54,7 +54,7 @@ Downloading a report The :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerDownloadReportOperator`. allows you to download a Campaign Manager to Google Cloud Storage bucket. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_get_report_operator] @@ -72,7 +72,7 @@ Waiting for a report Report are generated asynchronously. To wait for report to be ready for downloading you can use :class:`~airflow.providers.google.marketing_platform.sensors.campaign_manager.GoogleCampaignManagerReportSensor`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_wait_for_operation] @@ -91,7 +91,7 @@ To insert a Campaign Manager report you can use the :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerInsertReportOperator`. Running this operator creates a new report. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_insert_report_operator] @@ -111,7 +111,7 @@ Running a report To run Campaign Manager report you can use the :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerRunReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_run_report_operator] @@ -130,7 +130,7 @@ Inserting a conversions To insert Campaign Manager conversions you can use the :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchInsertConversionsOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_insert_conversions] @@ -149,7 +149,7 @@ Updating a conversions To update Campaign Manager conversions you can use the :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchUpdateConversionsOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py :language: python :dedent: 4 :start-after: [START howto_campaign_manager_update_conversions] diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst index 60cd3db322bef9..3e4b7de83dedae 100644 --- a/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst @@ -33,7 +33,7 @@ Creating a report To create Display&Video 360 report use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_createquery_report_operator] @@ -53,7 +53,7 @@ Deleting a report To delete Display&Video 360 report use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DeleteReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_deletequery_report_operator] @@ -71,7 +71,7 @@ Waiting for report To wait for the report use :class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_wait_report_operator] @@ -89,7 +89,7 @@ Downloading a report To download a report to GCS bucket use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_getquery_report_operator] @@ -108,7 +108,7 @@ Running a report To run Display&Video 360 report use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_runquery_report_operator] @@ -137,7 +137,7 @@ The operator accepts body request: To download line items in CSV format report use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadLineItemsOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video_misc.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_download_line_items_operator] @@ -156,7 +156,7 @@ Upload line items To run Display&Video 360 uploading line items use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360UploadLineItemsOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video_misc.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_upload_line_items_operator] @@ -174,7 +174,7 @@ Create SDF download task To create SDF download task use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video_sdf.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_create_sdf_download_task_operator] @@ -193,7 +193,7 @@ Save SDF files in the Google Cloud Storage To save SDF files and save them in the Google Cloud Storage use :class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video_sdf.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_save_sdf_in_gcs_operator] @@ -211,7 +211,7 @@ Waiting for SDF operation Wait for SDF operation is executed by: :class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_display_video_sdf.py :language: python :dedent: 4 :start-after: [START howto_google_display_video_wait_for_operation_sensor] diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/index.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/index.rst index ecd8cc5d307e9e..8a808078fc554d 100644 --- a/docs/apache-airflow-providers-google/operators/marketing_platform/index.rst +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/index.rst @@ -29,4 +29,4 @@ Google Marketing Platform Operators .. note:: You can learn how to use Google Cloud integrations by analyzing the - `source code `_ of the particular example DAGs. + `source code `_ of the particular example DAGs. diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst index 4832e491b67a6f..e91023826e8ba4 100644 --- a/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst @@ -34,7 +34,7 @@ Inserting a report To insert a Search Ads report use the :class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsInsertReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_search_ads.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_search_ads.py :language: python :dedent: 4 :start-after: [START howto_search_ads_generate_report_operator] @@ -46,7 +46,7 @@ parameters which allows you to dynamically determine values. You can provide rep .json`` file as this operator supports this template extension. The result is saved to :ref:`XCom `, which allows it to be used by other operators: -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_search_ads.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_search_ads.py :language: python :dedent: 4 :start-after: [START howto_search_ads_get_report_id] @@ -60,7 +60,7 @@ Awaiting for a report To wait for a report to be ready for download use :class:`~airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_search_ads.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_search_ads.py :language: python :dedent: 4 :start-after: [START howto_search_ads_get_report_operator] @@ -78,7 +78,7 @@ Downloading a report To download a Search Ads report to Google Cloud Storage bucket use the :class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsDownloadReportOperator`. -.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_search_ads.py +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_search_ads.py :language: python :dedent: 4 :start-after: [START howto_search_ads_getfile_report_operator] diff --git a/tests/providers/google/marketing_platform/operators/test_analytics_system.py b/tests/providers/google/marketing_platform/operators/test_analytics_system.py index ca0931435c74ec..73f258d9067f0c 100644 --- a/tests/providers/google/marketing_platform/operators/test_analytics_system.py +++ b/tests/providers/google/marketing_platform/operators/test_analytics_system.py @@ -17,8 +17,8 @@ import pytest -from airflow.providers.google.marketing_platform.example_dags.example_analytics import BUCKET, BUCKET_FILENAME from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY +from tests.system.providers.google.marketing_platform.example_analytics import BUCKET, BUCKET_FILENAME from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context DATA = """pagePath,dimension1 diff --git a/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py b/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py index 3838e6255be4db..0930a472f059b7 100644 --- a/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py +++ b/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py @@ -17,8 +17,8 @@ # under the License. import pytest -from airflow.providers.google.marketing_platform.example_dags.example_campaign_manager import BUCKET from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY +from tests.system.providers.google.marketing_platform.example_campaign_manager import BUCKET from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context # Required scopes diff --git a/tests/providers/google/marketing_platform/operators/test_display_video_system.py b/tests/providers/google/marketing_platform/operators/test_display_video_system.py index a29bbb3f9297ba..32ad057a114fb5 100644 --- a/tests/providers/google/marketing_platform/operators/test_display_video_system.py +++ b/tests/providers/google/marketing_platform/operators/test_display_video_system.py @@ -17,8 +17,8 @@ import pytest from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -from airflow.providers.google.marketing_platform.example_dags.example_display_video import BUCKET from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY, GMP_KEY +from tests.system.providers.google.marketing_platform.example_display_video import BUCKET from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context # Requires the following scope: diff --git a/tests/providers/google/marketing_platform/operators/test_search_ads_system.py b/tests/providers/google/marketing_platform/operators/test_search_ads_system.py index 9c6dfe99b134f1..39cf1aed850ff7 100644 --- a/tests/providers/google/marketing_platform/operators/test_search_ads_system.py +++ b/tests/providers/google/marketing_platform/operators/test_search_ads_system.py @@ -17,8 +17,8 @@ # under the License. import pytest -from airflow.providers.google.marketing_platform.example_dags.example_search_ads import GCS_BUCKET from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY +from tests.system.providers.google.marketing_platform.example_search_ads import GCS_BUCKET from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context # Requires the following scope: diff --git a/airflow/providers/google/marketing_platform/example_dags/__init__.py b/tests/system/providers/google/__init__.py similarity index 99% rename from airflow/providers/google/marketing_platform/example_dags/__init__.py rename to tests/system/providers/google/__init__.py index 13a83393a9124b..217e5db9607827 100644 --- a/airflow/providers/google/marketing_platform/example_dags/__init__.py +++ b/tests/system/providers/google/__init__.py @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/tests/system/providers/google/marketing_platform/__init__.py b/tests/system/providers/google/marketing_platform/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/tests/system/providers/google/marketing_platform/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/marketing_platform/example_dags/example_analytics.py b/tests/system/providers/google/marketing_platform/example_analytics.py similarity index 84% rename from airflow/providers/google/marketing_platform/example_dags/example_analytics.py rename to tests/system/providers/google/marketing_platform/example_analytics.py index cc08b2a52c9f7a..6e095f81f1b0d6 100644 --- a/airflow/providers/google/marketing_platform/example_dags/example_analytics.py +++ b/tests/system/providers/google/marketing_platform/example_analytics.py @@ -29,7 +29,10 @@ GoogleAnalyticsModifyFileHeadersDataImportOperator, GoogleAnalyticsRetrieveAdsLinksListOperator, ) +from airflow.utils.trigger_rule import TriggerRule +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_google_analytics" ACCOUNT_ID = os.environ.get("GA_ACCOUNT_ID", "123456789") BUCKET = os.environ.get("GMP_ANALYTICS_BUCKET", "test-airflow-analytics-bucket") @@ -39,7 +42,7 @@ DATA_ID = "kjdDu3_tQa6n8Q1kXFtSmg" with models.DAG( - "example_google_analytics", + DAG_ID, schedule_interval='@once', # Override to match your needs, start_date=datetime(2021, 1, 1), catchup=False, @@ -77,12 +80,26 @@ account_id=ACCOUNT_ID, web_property_id=WEB_PROPERTY_ID, custom_data_source_id=DATA_ID, + trigger_rule=TriggerRule.ALL_DONE, ) transform = GoogleAnalyticsModifyFileHeadersDataImportOperator( task_id="transform", storage_bucket=BUCKET, storage_name_object=BUCKET_FILENAME, + trigger_rule=TriggerRule.ALL_DONE, ) upload >> [delete, transform] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py b/tests/system/providers/google/marketing_platform/example_campaign_manager.py similarity index 91% rename from airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py rename to tests/system/providers/google/marketing_platform/example_campaign_manager.py index 4c32391fb0a3fc..d4afb916388c5f 100644 --- a/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +++ b/tests/system/providers/google/marketing_platform/example_campaign_manager.py @@ -34,7 +34,10 @@ from airflow.providers.google.marketing_platform.sensors.campaign_manager import ( GoogleCampaignManagerReportSensor, ) +from airflow.utils.trigger_rule import TriggerRule +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_campaign_manager" PROFILE_ID = os.environ.get("MARKETING_PROFILE_ID", "123456789") FLOODLIGHT_ACTIVITY_ID = int(os.environ.get("FLOODLIGHT_ACTIVITY_ID", 12345)) FLOODLIGHT_CONFIGURATION_ID = int(os.environ.get("FLOODLIGHT_CONFIGURATION_ID", 12345)) @@ -84,7 +87,7 @@ } with models.DAG( - "example_campaign_manager", + DAG_ID, schedule_interval='@once', # Override to match your needs, start_date=datetime(2021, 1, 1), catchup=False, @@ -158,12 +161,25 @@ encryption_entity_type="DCM_ADVERTISER", encryption_entity_id=ENCRYPTION_ENTITY_ID, max_failed_updates=1, + trigger_rule=TriggerRule.ALL_DONE, ) # [END howto_campaign_manager_update_conversions] insert_conversion >> update_conversion + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + if __name__ == "__main__": dag.clear() dag.run() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/marketing_platform/example_display_video.py b/tests/system/providers/google/marketing_platform/example_display_video.py new file mode 100644 index 00000000000000..3e339784352f2d --- /dev/null +++ b/tests/system/providers/google/marketing_platform/example_display_video.py @@ -0,0 +1,140 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that shows how to use DisplayVideo. +""" +import os +from datetime import datetime +from typing import Dict + +from airflow import models +from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook +from airflow.providers.google.marketing_platform.operators.display_video import ( + GoogleDisplayVideo360CreateReportOperator, + GoogleDisplayVideo360DeleteReportOperator, + GoogleDisplayVideo360DownloadReportOperator, + GoogleDisplayVideo360RunReportOperator, +) +from airflow.providers.google.marketing_platform.sensors.display_video import ( + GoogleDisplayVideo360ReportSensor, +) +from airflow.utils.trigger_rule import TriggerRule + +# [START howto_display_video_env_variables] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_display_video" +BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://INVALID BUCKET NAME") +ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567) +OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv") +PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt") +PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt") +BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] +SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1") +BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test") +GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123) +ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem") +ERF_SOURCE_OBJECT = GoogleDisplayVideo360Hook.erf_uri(GMP_PARTNER_ID, ENTITY_TYPE) + +REPORT = { + "kind": "doubleclickbidmanager#query", + "metadata": { + "title": "Polidea Test Report", + "dataRange": "LAST_7_DAYS", + "format": "CSV", + "sendNotification": False, + }, + "params": { + "type": "TYPE_GENERAL", + "groupBys": ["FILTER_DATE", "FILTER_PARTNER"], + "filters": [{"type": "FILTER_PARTNER", "value": 1486931}], + "metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"], + "includeInviteData": True, + }, + "schedule": {"frequency": "ONE_TIME"}, +} + +PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"} + +CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: Dict = { + "version": SDF_VERSION, + "advertiserId": ADVERTISER_ID, + "inventorySourceFilter": {"inventorySourceIds": []}, +} + +DOWNLOAD_LINE_ITEMS_REQUEST: Dict = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"} +# [END howto_display_video_env_variables] + +START_DATE = datetime(2021, 1, 1) + +with models.DAG( + DAG_ID, + schedule_interval='@once', # Override to match your needs, + start_date=START_DATE, + catchup=False, +) as dag: + # [START howto_google_display_video_createquery_report_operator] + create_report = GoogleDisplayVideo360CreateReportOperator(body=REPORT, task_id="create_report") + report_id = create_report.output["report_id"] + # [END howto_google_display_video_createquery_report_operator] + + # [START howto_google_display_video_runquery_report_operator] + run_report = GoogleDisplayVideo360RunReportOperator( + report_id=report_id, parameters=PARAMETERS, task_id="run_report" + ) + # [END howto_google_display_video_runquery_report_operator] + + # [START howto_google_display_video_wait_report_operator] + wait_for_report = GoogleDisplayVideo360ReportSensor(task_id="wait_for_report", report_id=report_id) + # [END howto_google_display_video_wait_report_operator] + + # [START howto_google_display_video_getquery_report_operator] + get_report = GoogleDisplayVideo360DownloadReportOperator( + report_id=report_id, + task_id="get_report", + bucket_name=BUCKET, + report_name="test1.csv", + ) + # [END howto_google_display_video_getquery_report_operator] + + # [START howto_google_display_video_deletequery_report_operator] + delete_report = GoogleDisplayVideo360DeleteReportOperator( + report_id=report_id, + task_id="delete_report", + trigger_rule=TriggerRule.ALL_DONE, + ) + # [END howto_google_display_video_deletequery_report_operator] + + run_report >> wait_for_report >> get_report >> delete_report + + # Task dependencies created via `XComArgs`: + # create_report >> run_report + # create_report >> wait_for_report + # create_report >> get_report + # create_report >> delete_report + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/marketing_platform/example_display_video_misc.py b/tests/system/providers/google/marketing_platform/example_display_video_misc.py new file mode 100644 index 00000000000000..52bffe83a960c1 --- /dev/null +++ b/tests/system/providers/google/marketing_platform/example_display_video_misc.py @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that shows how to use DisplayVideo. +""" +import os +from datetime import datetime +from typing import Dict + +from airflow import models +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator +from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook +from airflow.providers.google.marketing_platform.operators.display_video import ( + GoogleDisplayVideo360DownloadLineItemsOperator, + GoogleDisplayVideo360UploadLineItemsOperator, +) + +# [START howto_display_video_env_variables] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_display_video_misc" +BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://INVALID BUCKET NAME") +ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567) +OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv") +PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt") +PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt") +BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] +SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1") +BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test") +GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123) +ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem") +ERF_SOURCE_OBJECT = GoogleDisplayVideo360Hook.erf_uri(GMP_PARTNER_ID, ENTITY_TYPE) + +REPORT = { + "kind": "doubleclickbidmanager#query", + "metadata": { + "title": "Polidea Test Report", + "dataRange": "LAST_7_DAYS", + "format": "CSV", + "sendNotification": False, + }, + "params": { + "type": "TYPE_GENERAL", + "groupBys": ["FILTER_DATE", "FILTER_PARTNER"], + "filters": [{"type": "FILTER_PARTNER", "value": 1486931}], + "metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"], + "includeInviteData": True, + }, + "schedule": {"frequency": "ONE_TIME"}, +} + +PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"} + +CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: Dict = { + "version": SDF_VERSION, + "advertiserId": ADVERTISER_ID, + "inventorySourceFilter": {"inventorySourceIds": []}, +} + +DOWNLOAD_LINE_ITEMS_REQUEST: Dict = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"} +# [END howto_display_video_env_variables] + +START_DATE = datetime(2021, 1, 1) + +with models.DAG( + DAG_ID, + schedule_interval='@once', # Override to match your needs, + start_date=START_DATE, + catchup=False, +) as dag: + # [START howto_google_display_video_upload_multiple_entity_read_files_to_big_query] + upload_erf_to_bq = GCSToBigQueryOperator( + task_id='upload_erf_to_bq', + bucket=BUCKET, + source_objects=ERF_SOURCE_OBJECT, + destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table", + write_disposition='WRITE_TRUNCATE', + ) + # [END howto_google_display_video_upload_multiple_entity_read_files_to_big_query] + + # [START howto_google_display_video_download_line_items_operator] + download_line_items = GoogleDisplayVideo360DownloadLineItemsOperator( + task_id="download_line_items", + request_body=DOWNLOAD_LINE_ITEMS_REQUEST, + bucket_name=BUCKET, + object_name=OBJECT_NAME, + gzip=False, + ) + # [END howto_google_display_video_download_line_items_operator] + + # [START howto_google_display_video_upload_line_items_operator] + upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator( + task_id="upload_line_items", + bucket_name=BUCKET, + object_name=BUCKET_FILE_LOCATION, + ) + # [END howto_google_display_video_upload_line_items_operator] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py b/tests/system/providers/google/marketing_platform/example_display_video_sdf.py similarity index 60% rename from airflow/providers/google/marketing_platform/example_dags/example_display_video.py rename to tests/system/providers/google/marketing_platform/example_display_video_sdf.py index d2e2d07b002afe..77e9e09df4ffd3 100644 --- a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py +++ b/tests/system/providers/google/marketing_platform/example_display_video_sdf.py @@ -26,21 +26,17 @@ from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook from airflow.providers.google.marketing_platform.operators.display_video import ( - GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator, - GoogleDisplayVideo360DeleteReportOperator, - GoogleDisplayVideo360DownloadLineItemsOperator, - GoogleDisplayVideo360DownloadReportOperator, - GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360SDFtoGCSOperator, - GoogleDisplayVideo360UploadLineItemsOperator, ) from airflow.providers.google.marketing_platform.sensors.display_video import ( GoogleDisplayVideo360GetSDFDownloadOperationSensor, - GoogleDisplayVideo360ReportSensor, ) +from airflow.utils.trigger_rule import TriggerRule # [START howto_display_video_env_variables] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_display_video_sdf" BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://INVALID BUCKET NAME") ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567) OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv") @@ -85,88 +81,11 @@ START_DATE = datetime(2021, 1, 1) with models.DAG( - "example_display_video", + DAG_ID, schedule_interval='@once', # Override to match your needs, start_date=START_DATE, catchup=False, -) as dag1: - # [START howto_google_display_video_createquery_report_operator] - create_report = GoogleDisplayVideo360CreateReportOperator(body=REPORT, task_id="create_report") - report_id = create_report.output["report_id"] - # [END howto_google_display_video_createquery_report_operator] - - # [START howto_google_display_video_runquery_report_operator] - run_report = GoogleDisplayVideo360RunReportOperator( - report_id=report_id, parameters=PARAMETERS, task_id="run_report" - ) - # [END howto_google_display_video_runquery_report_operator] - - # [START howto_google_display_video_wait_report_operator] - wait_for_report = GoogleDisplayVideo360ReportSensor(task_id="wait_for_report", report_id=report_id) - # [END howto_google_display_video_wait_report_operator] - - # [START howto_google_display_video_getquery_report_operator] - get_report = GoogleDisplayVideo360DownloadReportOperator( - report_id=report_id, - task_id="get_report", - bucket_name=BUCKET, - report_name="test1.csv", - ) - # [END howto_google_display_video_getquery_report_operator] - - # [START howto_google_display_video_deletequery_report_operator] - delete_report = GoogleDisplayVideo360DeleteReportOperator(report_id=report_id, task_id="delete_report") - # [END howto_google_display_video_deletequery_report_operator] - - run_report >> wait_for_report >> get_report >> delete_report - - # Task dependencies created via `XComArgs`: - # create_report >> run_report - # create_report >> wait_for_report - # create_report >> get_report - # create_report >> delete_report - - -with models.DAG( - "example_display_video_misc", - schedule_interval='@once', # Override to match your needs, - start_date=START_DATE, - catchup=False, -) as dag2: - # [START howto_google_display_video_upload_multiple_entity_read_files_to_big_query] - upload_erf_to_bq = GCSToBigQueryOperator( - task_id='upload_erf_to_bq', - bucket=BUCKET, - source_objects=ERF_SOURCE_OBJECT, - destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table", - write_disposition='WRITE_TRUNCATE', - ) - # [END howto_google_display_video_upload_multiple_entity_read_files_to_big_query] - - # [START howto_google_display_video_download_line_items_operator] - download_line_items = GoogleDisplayVideo360DownloadLineItemsOperator( - task_id="download_line_items", - request_body=DOWNLOAD_LINE_ITEMS_REQUEST, - bucket_name=BUCKET, - object_name=OBJECT_NAME, - gzip=False, - ) - # [END howto_google_display_video_download_line_items_operator] - - # [START howto_google_display_video_upload_line_items_operator] - upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator( - task_id="upload_line_items", - bucket_name=BUCKET, - object_name=BUCKET_FILE_LOCATION, - ) - # [END howto_google_display_video_upload_line_items_operator] - -with models.DAG( - "example_display_video_sdf", - schedule_interval='@once', # Override to match your needs, - start_date=START_DATE, - catchup=False, -) as dag3: +) as dag: # [START howto_google_display_video_create_sdf_download_task_operator] create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator( task_id="create_sdf_download_task", body_request=CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST @@ -188,6 +107,7 @@ bucket_name=BUCKET, object_name=BUCKET_FILE_LOCATION, gzip=False, + trigger_rule=TriggerRule.ALL_DONE, ) # [END howto_google_display_video_save_sdf_in_gcs_operator] @@ -209,3 +129,15 @@ # Task dependency created via `XComArgs`: # save_sdf_in_gcs >> upload_sdf_to_big_query + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/google/marketing_platform/example_dags/example_search_ads.py b/tests/system/providers/google/marketing_platform/example_search_ads.py similarity index 80% rename from airflow/providers/google/marketing_platform/example_dags/example_search_ads.py rename to tests/system/providers/google/marketing_platform/example_search_ads.py index 0c0e4d5775cfbf..0c715795558672 100644 --- a/airflow/providers/google/marketing_platform/example_dags/example_search_ads.py +++ b/tests/system/providers/google/marketing_platform/example_search_ads.py @@ -27,8 +27,11 @@ GoogleSearchAdsInsertReportOperator, ) from airflow.providers.google.marketing_platform.sensors.search_ads import GoogleSearchAdsReportSensor +from airflow.utils.trigger_rule import TriggerRule # [START howto_search_ads_env_variables] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_search_ads" AGENCY_ID = os.environ.get("GMP_AGENCY_ID") ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID") GCS_BUCKET = os.environ.get("GMP_GCS_BUCKET", "test-cm-bucket") @@ -45,7 +48,7 @@ # [END howto_search_ads_env_variables] with models.DAG( - "example_search_ads", + DAG_ID, schedule_interval='@once', # Override to match your needs, start_date=datetime(2021, 1, 1), catchup=False, @@ -64,7 +67,10 @@ # [START howto_search_ads_getfile_report_operator] download_report = GoogleSearchAdsDownloadReportOperator( - report_id=report_id, bucket_name=GCS_BUCKET, task_id="download_report" + report_id=report_id, + bucket_name=GCS_BUCKET, + task_id="download_report", + trigger_rule=TriggerRule.ALL_DONE, ) # [END howto_search_ads_getfile_report_operator] @@ -73,3 +79,15 @@ # Task dependencies created via `XComArgs`: # generate_report >> wait_for_report # generate_report >> download_report + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/test_utils/gcp_system_helpers.py b/tests/test_utils/gcp_system_helpers.py index 9a452e28d86404..2072595f6df90f 100644 --- a/tests/test_utils/gcp_system_helpers.py +++ b/tests/test_utils/gcp_system_helpers.py @@ -35,7 +35,7 @@ AIRFLOW_MAIN_FOLDER, "airflow", "providers", "google", "cloud", "example_dags" ) MARKETING_DAG_FOLDER = os.path.join( - AIRFLOW_MAIN_FOLDER, "airflow", "providers", "google", "marketing_platform", "example_dags" + AIRFLOW_MAIN_FOLDER, "tests", "system", "providers", "google", "marketing_platform" ) GSUITE_DAG_FOLDER = os.path.join( AIRFLOW_MAIN_FOLDER, "airflow", "providers", "google", "suite", "example_dags"