Skip to content

Commit

Permalink
Add support for providing additional experiments to Dataflow job (#6196)
Browse files Browse the repository at this point in the history
  • Loading branch information
wazim authored and emilymye committed Apr 30, 2020
1 parent 6c65946 commit a0b40cc
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 10 deletions.
29 changes: 20 additions & 9 deletions google/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ func resourceDataflowJob() *schema.Resource {
ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false),
},

"additional_experiments": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},

"job_id": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -179,17 +188,19 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {

params := expandStringMap(d, "parameters")
labels := expandStringMap(d, "labels")
additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
}

request := dataflow.CreateJobFromTemplateRequest{
Expand Down
81 changes: 81 additions & 0 deletions google/resource_dataflow_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,30 @@ func TestAccDataflowJob_withIpConfig(t *testing.T) {
})
}

func TestAccDataflowJobWithAdditionalExperiments(t *testing.T) {
t.Parallel()

randStr := randString(t, 10)
bucket := "tf-test-dataflow-gcs-" + randStr
job := "tf-test-dataflow-job-" + randStr
additionalExperiments := []string{"enable_stackdriver_agent_metrics", "shuffle_mode=service"}

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowJob_additionalExperiments(bucket, job, additionalExperiments),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_job.with_additional_experiments"),
testAccDataflowJobHasExperiments(t, "google_dataflow_job.with_additional_experiments", additionalExperiments),
),
},
},
})
}

func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
Expand Down Expand Up @@ -384,6 +408,39 @@ func testAccDataflowJobHasLabels(t *testing.T, res, key string) resource.TestChe
}
}

func testAccDataflowJobHasExperiments(t *testing.T, res string, experiments []string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[res]
if !ok {
return fmt.Errorf("resource %q not found in state", res)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}
config := googleProviderConfig(t)

job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).View("JOB_VIEW_ALL").Do()
if err != nil {
return fmt.Errorf("dataflow job does not exist")
}

for _, expectedExperiment := range experiments {
var contains = false
for _, actualExperiment := range job.Environment.Experiments {
if actualExperiment == expectedExperiment {
contains = true
}
}
if contains != true {
return fmt.Errorf("Expected experiment '%s' not found in experiments", expectedExperiment)
}
}

return nil
}
}

func testAccDataflowJob_zone(bucket, job, zone string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
Expand Down Expand Up @@ -583,3 +640,27 @@ resource "google_dataflow_job" "with_labels" {
`, bucket, job, labelKey, labelVal, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)

}

func testAccDataflowJob_additionalExperiments(bucket string, job string, experiments []string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "%s"
force_destroy = true
}
resource "google_dataflow_job" "with_additional_experiments" {
name = "%s"
additional_experiments = ["%s"]
template_gcs_path = "%s"
temp_gcs_location = google_storage_bucket.temp.url
parameters = {
inputFile = "%s"
output = "${google_storage_bucket.temp.url}/output"
}
on_delete = "cancel"
}
`, bucket, job, strings.Join(experiments, `", "`), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)

}
2 changes: 1 addition & 1 deletion website/docs/r/dataflow_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ The following arguments are supported:
* `subnetwork` - (Optional) The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK".
* `machine_type` - (Optional) The machine type to use for the job.
* `ip_configuration` - (Optional) The configuration for VM IPs. Options are `"WORKER_IP_PUBLIC"` or `"WORKER_IP_PRIVATE"`.

* `additional_experiments` - (Optional) List of experiments that should be used by the job. An example value is `["enable_stackdriver_agent_metrics"]`.

## Attributes Reference

Expand Down

0 comments on commit a0b40cc

Please sign in to comment.