diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index e2fc47281e1..3d8132014ac 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -151,6 +151,15 @@ func resourceDataflowJob() *schema.Resource { ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false), }, + "additional_experiments": { + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "job_id": { Type: schema.TypeString, Computed: true, @@ -179,17 +188,19 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { params := expandStringMap(d, "parameters") labels := expandStringMap(d, "labels") + additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set)) env := dataflow.RuntimeEnvironment{ - MaxWorkers: int64(d.Get("max_workers").(int)), - Network: d.Get("network").(string), - ServiceAccountEmail: d.Get("service_account_email").(string), - Subnetwork: d.Get("subnetwork").(string), - TempLocation: d.Get("temp_gcs_location").(string), - MachineType: d.Get("machine_type").(string), - IpConfiguration: d.Get("ip_configuration").(string), - AdditionalUserLabels: labels, - Zone: zone, + MaxWorkers: int64(d.Get("max_workers").(int)), + Network: d.Get("network").(string), + ServiceAccountEmail: d.Get("service_account_email").(string), + Subnetwork: d.Get("subnetwork").(string), + TempLocation: d.Get("temp_gcs_location").(string), + MachineType: d.Get("machine_type").(string), + IpConfiguration: d.Get("ip_configuration").(string), + AdditionalUserLabels: labels, + Zone: zone, + AdditionalExperiments: additionalExperiments, } request := dataflow.CreateJobFromTemplateRequest{ diff --git a/google/resource_dataflow_job_test.go b/google/resource_dataflow_job_test.go index 9d9e2aab0db..f0cd127c452 100644 --- a/google/resource_dataflow_job_test.go +++ b/google/resource_dataflow_job_test.go @@ -182,6 +182,30 @@ func TestAccDataflowJob_withIpConfig(t *testing.T) { }) } +func TestAccDataflowJobWithAdditionalExperiments(t *testing.T) { + t.Parallel() + + randStr := randString(t, 10) + bucket := "tf-test-dataflow-gcs-" + randStr + job := "tf-test-dataflow-job-" + randStr + additionalExperiments := []string{"enable_stackdriver_agent_metrics", "shuffle_mode=service"} + + vcrTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckDataflowJobDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccDataflowJob_additionalExperiments(bucket, job, additionalExperiments), + Check: resource.ComposeTestCheckFunc( + testAccDataflowJobExists(t, "google_dataflow_job.with_additional_experiments"), + testAccDataflowJobHasExperiments(t, "google_dataflow_job.with_additional_experiments", additionalExperiments), + ), + }, + }, + }) +} + func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.State) error { return func(s *terraform.State) error { for _, rs := range s.RootModule().Resources { @@ -384,6 +408,39 @@ func testAccDataflowJobHasLabels(t *testing.T, res, key string) resource.TestChe } } +func testAccDataflowJobHasExperiments(t *testing.T, res string, experiments []string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[res] + if !ok { + return fmt.Errorf("resource %q not found in state", res) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + config := googleProviderConfig(t) + + job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).View("JOB_VIEW_ALL").Do() + if err != nil { + return fmt.Errorf("dataflow job does not exist") + } + + for _, expectedExperiment := range experiments { + var contains = false + for _, actualExperiment := range job.Environment.Experiments { + if actualExperiment == expectedExperiment { + contains = true + } + } + if contains != true { + return fmt.Errorf("Expected experiment '%s' not found in experiments", expectedExperiment) + } + } + + return nil + } +} + func testAccDataflowJob_zone(bucket, job, zone string) string { return fmt.Sprintf(` resource "google_storage_bucket" "temp" { @@ -583,3 +640,27 @@ resource "google_dataflow_job" "with_labels" { `, bucket, job, labelKey, labelVal, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) } + +func testAccDataflowJob_additionalExperiments(bucket string, job string, experiments []string) string { + return fmt.Sprintf(` +resource "google_storage_bucket" "temp" { + name = "%s" + force_destroy = true +} + +resource "google_dataflow_job" "with_additional_experiments" { + name = "%s" + + additional_experiments = ["%s"] + + template_gcs_path = "%s" + temp_gcs_location = google_storage_bucket.temp.url + parameters = { + inputFile = "%s" + output = "${google_storage_bucket.temp.url}/output" + } + on_delete = "cancel" +} +`, bucket, job, strings.Join(experiments, `", "`), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) + +} diff --git a/website/docs/r/dataflow_job.html.markdown b/website/docs/r/dataflow_job.html.markdown index 21f9c77b98b..1c3aba2fa0b 100644 --- a/website/docs/r/dataflow_job.html.markdown +++ b/website/docs/r/dataflow_job.html.markdown @@ -58,7 +58,7 @@ The following arguments are supported: * `subnetwork` - (Optional) The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK". * `machine_type` - (Optional) The machine type to use for the job. * `ip_configuration` - (Optional) The configuration for VM IPs. Options are `"WORKER_IP_PUBLIC"` or `"WORKER_IP_PRIVATE"`. - +* `additional_experiments` - (Optional) List of experiments that should be used by the job. An example value is `["enable_stackdriver_agent_metrics"]`. ## Attributes Reference