diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 11d47b81cb4bfa..c937c1f2b4ac25 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -121,6 +121,9 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator): config files (e.g. spark-defaults.conf), see https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig :type properties: dict + :param optional_components: List of optional cluster components, for more info see + https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig#Component + :type optional_components: list[str] :param num_masters: The # of master nodes to spin up :type num_masters: int :param master_machine_type: Compute engine machine type to use for the master node @@ -208,6 +211,7 @@ def __init__(self, image_version=None, autoscaling_policy=None, properties=None, + optional_components=None, num_masters=1, master_machine_type='n1-standard-4', master_disk_type='pd-standard', @@ -240,6 +244,7 @@ def __init__(self, self.custom_image_project_id = custom_image_project_id self.image_version = image_version self.properties = properties or dict() + self.optional_components = optional_components self.master_machine_type = master_machine_type self.master_disk_type = master_disk_type self.master_disk_size = master_disk_size @@ -417,6 +422,9 @@ def _build_cluster_data(self): if self.properties: cluster_data['config']['softwareConfig']['properties'] = self.properties + if self.optional_components: + cluster_data['config']['softwareConfig']['optionalComponents'] = self.optional_components + cluster_data = self._build_lifecycle_config(cluster_data) if self.init_actions_uris: diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 3941e8cce8e55e..88fcf8c238f4c9 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -63,6 +63,7 @@ IMAGE_VERSION = '1.1' CUSTOM_IMAGE = 'test-custom-image' CUSTOM_IMAGE_PROJECT_ID = 'test-custom-image-project-id' +OPTIONAL_COMPONENTS = ['COMPONENT1', 'COMPONENT2'] MASTER_MACHINE_TYPE = 'n1-standard-2' MASTER_DISK_SIZE = 100 MASTER_DISK_TYPE = 'pd-standard' @@ -347,6 +348,17 @@ def test_init_with_custom_image_with_custom_image_project_id(self): self.assertEqual(cluster_data['config']['workerConfig']['imageUri'], expected_custom_image_url) + def test_build_cluster_data_with_optional_components(self): + dataproc_operator = DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT_ID, + num_workers=NUM_WORKERS, + optional_components=OPTIONAL_COMPONENTS, + ) + cluster_data = dataproc_operator._build_cluster_data() + self.assertEqual(cluster_data['config']['softwareConfig']['optionalComponents'], OPTIONAL_COMPONENTS) + def test_build_single_node_cluster(self): dataproc_operator = DataprocClusterCreateOperator( task_id=TASK_ID,