From cb31d08734a9873b7d16ab8325e698fde89feaa3 Mon Sep 17 00:00:00 2001 From: Igor Dralyuk Date: Thu, 8 Aug 2019 09:04:10 +0200 Subject: [PATCH] [AIRFLOW-5045] Add ability to create Google Dataproc cluster with custom image from a different project (#5752) --- .../contrib/operators/dataproc_operator.py | 8 ++++++- .../operators/test_dataproc_operator.py | 22 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index feaddbeb3a105d..b94b6b9dd4790a 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -110,6 +110,9 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator): :param custom_image: custom Dataproc image for more info see https://cloud.google.com/dataproc/docs/guides/dataproc-images :type custom_image: str + :param custom_image_project_id: project id for the custom Dataproc image, for more info see + https://cloud.google.com/dataproc/docs/guides/dataproc-images + :type custom_image_project_id: str :param autoscaling_policy: The autoscaling policy used by the cluster. Only resource names including projectid and location (region) are valid. Example: ``projects/[projectId]/locations/[dataproc_region]/autoscalingPolicies/[policy_id]`` @@ -199,6 +202,7 @@ def __init__(self, init_action_timeout="10m", metadata=None, custom_image=None, + custom_image_project_id=None, image_version=None, autoscaling_policy=None, properties=None, @@ -229,6 +233,7 @@ def __init__(self, self.init_action_timeout = init_action_timeout self.metadata = metadata self.custom_image = custom_image + self.custom_image_project_id = custom_image_project_id self.image_version = image_version self.properties = properties or dict() self.master_machine_type = master_machine_type @@ -392,8 +397,9 @@ def _build_cluster_data(self): cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version elif self.custom_image: + project_id = self.custom_image_project_id if (self.custom_image_project_id) else self.project_id custom_image_url = 'https://www.googleapis.com/compute/beta/projects/' \ - '{}/global/images/{}'.format(self.project_id, + '{}/global/images/{}'.format(project_id, self.custom_image) cluster_data['config']['masterConfig']['imageUri'] = custom_image_url if not self.single_node: diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 80c76c2c7b993c..873fb1d01e14c1 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -61,6 +61,7 @@ STORAGE_BUCKET = 'gs://airflow-test-bucket/' IMAGE_VERSION = '1.1' CUSTOM_IMAGE = 'test-custom-image' +CUSTOM_IMAGE_PROJECT_ID = 'test-custom-image-project-id' MASTER_MACHINE_TYPE = 'n1-standard-2' MASTER_DISK_SIZE = 100 MASTER_DISK_TYPE = 'pd-standard' @@ -324,6 +325,27 @@ def test_init_with_custom_image(self): self.assertEqual(cluster_data['config']['workerConfig']['imageUri'], expected_custom_image_url) + def test_init_with_custom_image_with_custom_image_project_id(self): + dataproc_operator = DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT_ID, + num_workers=NUM_WORKERS, + zone=GCE_ZONE, + dag=self.dag, + custom_image=CUSTOM_IMAGE, + custom_image_project_id=CUSTOM_IMAGE_PROJECT_ID + ) + + cluster_data = dataproc_operator._build_cluster_data() + expected_custom_image_url = \ + 'https://www.googleapis.com/compute/beta/projects/' \ + '{}/global/images/{}'.format(CUSTOM_IMAGE_PROJECT_ID, CUSTOM_IMAGE) + self.assertEqual(cluster_data['config']['masterConfig']['imageUri'], + expected_custom_image_url) + self.assertEqual(cluster_data['config']['workerConfig']['imageUri'], + expected_custom_image_url) + def test_build_single_node_cluster(self): dataproc_operator = DataprocClusterCreateOperator( task_id=TASK_ID,