From 632fd497b5a6affac0edde49c60e2c0556ecbe64 Mon Sep 17 00:00:00 2001 From: Jonathan Sharifi Date: Fri, 24 Apr 2020 13:04:36 +0100 Subject: [PATCH 1/4] Add support for providing additional experiments to Dataflow job --- google/resource_dataflow_job.go | 29 ++++++---- google/resource_dataflow_job_test.go | 81 ++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 9 deletions(-) diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index e2fc47281e1..f3c1d8a80ec 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -151,6 +151,15 @@ func resourceDataflowJob() *schema.Resource { ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false), }, + "additional_experiments": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "job_id": { Type: schema.TypeString, Computed: true, @@ -179,17 +188,19 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { params := expandStringMap(d, "parameters") labels := expandStringMap(d, "labels") + additionalExperiments := convertStringArr(d.Get("additional_experiments").([]interface{})) env := dataflow.RuntimeEnvironment{ - MaxWorkers: int64(d.Get("max_workers").(int)), - Network: d.Get("network").(string), - ServiceAccountEmail: d.Get("service_account_email").(string), - Subnetwork: d.Get("subnetwork").(string), - TempLocation: d.Get("temp_gcs_location").(string), - MachineType: d.Get("machine_type").(string), - IpConfiguration: d.Get("ip_configuration").(string), - AdditionalUserLabels: labels, - Zone: zone, + MaxWorkers: int64(d.Get("max_workers").(int)), + Network: d.Get("network").(string), + ServiceAccountEmail: d.Get("service_account_email").(string), + Subnetwork: d.Get("subnetwork").(string), + TempLocation: d.Get("temp_gcs_location").(string), + MachineType: d.Get("machine_type").(string), + IpConfiguration: d.Get("ip_configuration").(string), + AdditionalUserLabels: labels, + Zone: zone, + AdditionalExperiments: additionalExperiments, } request := dataflow.CreateJobFromTemplateRequest{ diff --git a/google/resource_dataflow_job_test.go b/google/resource_dataflow_job_test.go index 9d9e2aab0db..68a6ee9cee5 100644 --- a/google/resource_dataflow_job_test.go +++ b/google/resource_dataflow_job_test.go @@ -182,6 +182,30 @@ func TestAccDataflowJob_withIpConfig(t *testing.T) { }) } +func TestAccDataflowJobWithAdditionalExperiments(t *testing.T) { + t.Parallel() + + randStr := randString(t, 10) + bucket := "tf-test-dataflow-gcs-" + randStr + job := "tf-test-dataflow-job-" + randStr + additionalExperiments := []string{"enable_stackdriver_agent_metrics"} + + vcrTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckDataflowJobDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccDataflowJob_additionalExperiments(bucket, job, additionalExperiments), + Check: resource.ComposeTestCheckFunc( + testAccDataflowJobExists(t, "google_dataflow_job.with_additional_experiments"), + testAccDataflowJobHasExperiments(t, "google_dataflow_job.with_additional_experiments", additionalExperiments), + ), + }, + }, + }) +} + func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.State) error { return func(s *terraform.State) error { for _, rs := range s.RootModule().Resources { @@ -384,6 +408,39 @@ func testAccDataflowJobHasLabels(t *testing.T, res, key string) resource.TestChe } } +func testAccDataflowJobHasExperiments(t *testing.T, res string, experiments []string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[res] + if !ok { + return fmt.Errorf("resource %q not found in state", res) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + config := googleProviderConfig(t) + + job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).View("JOB_VIEW_ALL").Do() + if err != nil { + return fmt.Errorf("dataflow job does not exist") + } + + for _, expectedExperiment := range experiments { + var contains = false + for _, actualExperiment := range job.Environment.Experiments { + if actualExperiment == expectedExperiment { + contains = true + } + } + if contains != true { + return fmt.Errorf("Expected experiment '%s' not found in experiments", expectedExperiment) + } + } + + return nil + } +} + func testAccDataflowJob_zone(bucket, job, zone string) string { return fmt.Sprintf(` resource "google_storage_bucket" "temp" { @@ -583,3 +640,27 @@ resource "google_dataflow_job" "with_labels" { `, bucket, job, labelKey, labelVal, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) } + +func testAccDataflowJob_additionalExperiments(bucket string, job string, experiments []string) string { + return fmt.Sprintf(` +resource "google_storage_bucket" "temp" { + name = "%s" + force_destroy = true +} + +resource "google_dataflow_job" "with_additional_experiments" { + name = "%s" + + additional_experiments = ["%s"] + + template_gcs_path = "%s" + temp_gcs_location = google_storage_bucket.temp.url + parameters = { + inputFile = "%s" + output = "${google_storage_bucket.temp.url}/output" + } + on_delete = "cancel" +} +`, bucket, job, strings.Join(experiments, ","), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) + +} From 9d8d61068fae1c4f6cc462ceea99e29e9e40910e Mon Sep 17 00:00:00 2001 From: Jonathan Sharifi Date: Fri, 24 Apr 2020 13:43:23 +0100 Subject: [PATCH 2/4] Fix formatting --- google/resource_dataflow_job.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index f3c1d8a80ec..5b45e6dff13 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -152,9 +152,9 @@ func resourceDataflowJob() *schema.Resource { }, "additional_experiments": { - Type: schema.TypeList, - Optional: true, - ForceNew: true, + Type: schema.TypeList, + Optional: true, + ForceNew: true, Elem: &schema.Schema{ Type: schema.TypeString, }, From 6c22cafec19e4f9787b8910c7ece23fabf615e32 Mon Sep 17 00:00:00 2001 From: Jonathan Sharifi Date: Fri, 24 Apr 2020 23:26:30 +0100 Subject: [PATCH 3/4] Updating additional_experiements type to Set. Updating test case with another experiment --- google/resource_dataflow_job.go | 4 ++-- google/resource_dataflow_job_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index 5b45e6dff13..3d8132014ac 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -152,7 +152,7 @@ func resourceDataflowJob() *schema.Resource { }, "additional_experiments": { - Type: schema.TypeList, + Type: schema.TypeSet, Optional: true, ForceNew: true, Elem: &schema.Schema{ @@ -188,7 +188,7 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { params := expandStringMap(d, "parameters") labels := expandStringMap(d, "labels") - additionalExperiments := convertStringArr(d.Get("additional_experiments").([]interface{})) + additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set)) env := dataflow.RuntimeEnvironment{ MaxWorkers: int64(d.Get("max_workers").(int)), diff --git a/google/resource_dataflow_job_test.go b/google/resource_dataflow_job_test.go index 68a6ee9cee5..f0cd127c452 100644 --- a/google/resource_dataflow_job_test.go +++ b/google/resource_dataflow_job_test.go @@ -188,7 +188,7 @@ func TestAccDataflowJobWithAdditionalExperiments(t *testing.T) { randStr := randString(t, 10) bucket := "tf-test-dataflow-gcs-" + randStr job := "tf-test-dataflow-job-" + randStr - additionalExperiments := []string{"enable_stackdriver_agent_metrics"} + additionalExperiments := []string{"enable_stackdriver_agent_metrics", "shuffle_mode=service"} vcrTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, @@ -661,6 +661,6 @@ resource "google_dataflow_job" "with_additional_experiments" { } on_delete = "cancel" } -`, bucket, job, strings.Join(experiments, ","), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) +`, bucket, job, strings.Join(experiments, `", "`), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) } From 31ad2d72f12e12fa188f0bb8aa8b28a0c3f601a7 Mon Sep 17 00:00:00 2001 From: Jonathan Sharifi Date: Mon, 27 Apr 2020 10:46:04 +0100 Subject: [PATCH 4/4] Update documentation to include additional_experiments Dataflow argument --- website/docs/r/dataflow_job.html.markdown | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/docs/r/dataflow_job.html.markdown b/website/docs/r/dataflow_job.html.markdown index 21f9c77b98b..1c3aba2fa0b 100644 --- a/website/docs/r/dataflow_job.html.markdown +++ b/website/docs/r/dataflow_job.html.markdown @@ -58,7 +58,7 @@ The following arguments are supported: * `subnetwork` - (Optional) The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK". * `machine_type` - (Optional) The machine type to use for the job. * `ip_configuration` - (Optional) The configuration for VM IPs. Options are `"WORKER_IP_PUBLIC"` or `"WORKER_IP_PRIVATE"`. - +* `additional_experiments` - (Optional) List of experiments that should be used by the job. An example value is `["enable_stackdriver_agent_metrics"]`. ## Attributes Reference