Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5118] Add ability to specify optional components in Dataproc… #5821

Merged
merged 1 commit into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/contrib/operators/dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
config files (e.g. spark-defaults.conf), see
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig
:type properties: dict
:param optional_components: List of optional cluster components, for more info see
https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig#Component
:type optional_components: list[str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:type optional_components: list[str]
:type optional_components: List[str]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is inconsistency in this file in the way types are written. We have to review all types then we will standarize it. Now the current version is more popular.

:param num_masters: The # of master nodes to spin up
:type num_masters: int
:param master_machine_type: Compute engine machine type to use for the master node
Expand Down Expand Up @@ -208,6 +211,7 @@ def __init__(self,
image_version=None,
autoscaling_policy=None,
properties=None,
optional_components=None,
num_masters=1,
master_machine_type='n1-standard-4',
master_disk_type='pd-standard',
Expand Down Expand Up @@ -240,6 +244,7 @@ def __init__(self,
self.custom_image_project_id = custom_image_project_id
self.image_version = image_version
self.properties = properties or dict()
self.optional_components = optional_components
self.master_machine_type = master_machine_type
self.master_disk_type = master_disk_type
self.master_disk_size = master_disk_size
Expand Down Expand Up @@ -417,6 +422,9 @@ def _build_cluster_data(self):
if self.properties:
cluster_data['config']['softwareConfig']['properties'] = self.properties

if self.optional_components:
cluster_data['config']['softwareConfig']['optionalComponents'] = self.optional_components

cluster_data = self._build_lifecycle_config(cluster_data)

if self.init_actions_uris:
Expand Down
12 changes: 12 additions & 0 deletions tests/contrib/operators/test_dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
IMAGE_VERSION = '1.1'
CUSTOM_IMAGE = 'test-custom-image'
CUSTOM_IMAGE_PROJECT_ID = 'test-custom-image-project-id'
OPTIONAL_COMPONENTS = ['COMPONENT1', 'COMPONENT2']
MASTER_MACHINE_TYPE = 'n1-standard-2'
MASTER_DISK_SIZE = 100
MASTER_DISK_TYPE = 'pd-standard'
Expand Down Expand Up @@ -347,6 +348,17 @@ def test_init_with_custom_image_with_custom_image_project_id(self):
self.assertEqual(cluster_data['config']['workerConfig']['imageUri'],
expected_custom_image_url)

def test_build_cluster_data_with_optional_components(self):
dataproc_operator = DataprocClusterCreateOperator(
task_id=TASK_ID,
cluster_name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
num_workers=NUM_WORKERS,
optional_components=OPTIONAL_COMPONENTS,
)
cluster_data = dataproc_operator._build_cluster_data()
self.assertEqual(cluster_data['config']['softwareConfig']['optionalComponents'], OPTIONAL_COMPONENTS)

def test_build_single_node_cluster(self):
dataproc_operator = DataprocClusterCreateOperator(
task_id=TASK_ID,
Expand Down