Skip to content

Commit

Permalink
added support for user-defined labels on resource google_dataflow_job
Browse files Browse the repository at this point in the history
  • Loading branch information
megan07 authored Jul 26, 2019
2 parents cc39318 + 859f83a commit 86cbdc9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
23 changes: 16 additions & 7 deletions google/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func resourceDataflowJob() *schema.Resource {
ForceNew: true,
},

"labels": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"on_delete": {
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Expand Down Expand Up @@ -139,15 +145,17 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
}

params := expandStringMap(d, "parameters")
labels := expandStringMap(d, "labels")

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
Zone: zone,
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
AdditionalUserLabels: labels,
Zone: zone,
}

request := dataflow.CreateJobFromTemplateRequest{
Expand Down Expand Up @@ -189,6 +197,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
d.Set("labels", job.Labels)

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
Expand Down
76 changes: 76 additions & 0 deletions google/resource_dataflow_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,29 @@ func TestAccDataflowJobCreateWithSubnetwork(t *testing.T) {
})
}

func TestAccDataflowJobCreateWithLabels(t *testing.T) {
t.Parallel()

key := "my-label"

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccDataflowJobWithLabels(key),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(
"google_dataflow_job.with_labels"),
testAccDataflowJobHasLabels(
"google_dataflow_job.with_labels", key),
),
},
},
})
}

func testAccCheckDataflowJobDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "google_dataflow_job" {
Expand Down Expand Up @@ -320,6 +343,30 @@ func testAccDataflowJobRegionExists(n string) resource.TestCheckFunc {
}
}

func testAccDataflowJobHasLabels(n, key string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}
config := testAccProvider.Meta().(*Config)
job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if err != nil {
return fmt.Errorf("Job does not exist")
}

if job.Labels[key] != rs.Primary.Attributes["labels."+key] {
return fmt.Errorf("Labels do not match what is stored in state.")
}

return nil
}
}

var testAccDataflowJob = fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"
Expand Down Expand Up @@ -471,3 +518,32 @@ resource "google_dataflow_job" "big_data" {
on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())

func testAccDataflowJobWithLabels(key string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"
force_destroy = true
}
resource "google_dataflow_job" "with_labels" {
name = "dfjob-test-%s"
template_gcs_path = "gs://dataflow-templates/wordcount/template_file"
temp_gcs_location = "${google_storage_bucket.temp.url}"
labels = {
"my-label" = "test"
}
parameters = {
inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt"
output = "${google_storage_bucket.temp.url}/output"
}
zone = "us-central1-f"
project = "%s"
on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())
}
1 change: 1 addition & 0 deletions website/docs/r/dataflow_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The following arguments are supported:
- - -

* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as used in the template).
* `labels` - (Optional) User labels to be specified for the job. Keys and values should follow the restrictions specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions) page.
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
Expand Down

0 comments on commit 86cbdc9

Please sign in to comment.