Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dataproc oss metric collection #13480

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/7087.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
dataproc: added support for `dataproc_metric_config` to resource `google_dataproc_cluster`
```
92 changes: 92 additions & 0 deletions google/resource_dataproc_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ var (
"cluster_config.0.software_config.0.optional_components",
}

dataprocMetricConfigKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics",
}

metricKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source",
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides",
}

clusterConfigKeys = []string{
"cluster_config.0.staging_bucket",
"cluster_config.0.temp_bucket",
Expand All @@ -99,6 +108,7 @@ var (
"cluster_config.0.metastore_config",
"cluster_config.0.lifecycle_config",
"cluster_config.0.endpoint_config",
"cluster_config.0.dataproc_metric_config",
}
)

Expand Down Expand Up @@ -1086,6 +1096,24 @@ by Dataproc`,
},
},
},

"dataproc_metric_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: `The config for Dataproc metrics.`,
AtLeastOneOf: clusterConfigKeys,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"metrics": {
Type: schema.TypeList,
Required: true,
Description: `Metrics sources to enable.`,
Elem: metricsSchema(),
},
},
},
},
},
},
},
Expand All @@ -1094,6 +1122,28 @@ by Dataproc`,
}
}

// We need to pull metrics' schema out so we can use it to make a set hash func
func metricsSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"metric_source": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: validation.StringInSlice([]string{"MONITORING_AGENT_DEFAULTS", "HDFS", "SPARK", "YARN", "SPARK_HISTORY_SERVER", "HIVESERVER2"}, false),
Description: `A source for the collection of Dataproc OSS metrics (see [available OSS metrics] (https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).`,
},
"metric_overrides": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
ForceNew: true,
Description: `Specify one or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect.`,
},
},
}
}

func instanceConfigSchema(parent string) *schema.Schema {
var instanceConfigKeys = []string{
"cluster_config.0." + parent + ".0.num_instances",
Expand Down Expand Up @@ -1544,6 +1594,10 @@ func expandClusterConfig(d *schema.ResourceData, config *Config) (*dataproc.Clus
conf.EndpointConfig = expandEndpointConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.dataproc_metric_config"); ok {
conf.DataprocMetricConfig = expandDataprocMetricConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.master_config"); ok {
log.Println("[INFO] got master_config")
conf.MasterConfig = expandInstanceGroupConfig(cfg)
Expand Down Expand Up @@ -1762,6 +1816,23 @@ func expandEndpointConfig(cfg map[string]interface{}) *dataproc.EndpointConfig {
return conf
}

func expandDataprocMetricConfig(cfg map[string]interface{}) *dataproc.DataprocMetricConfig {
conf := &dataproc.DataprocMetricConfig{}
metricsConfigs := cfg["metrics"].([]interface{})
metricsSet := make([]*dataproc.Metric, 0, len(metricsConfigs))

for _, raw := range metricsConfigs {
data := raw.(map[string]interface{})
metric := dataproc.Metric{
MetricSource: data["metric_source"].(string),
MetricOverrides: convertStringSet(data["metric_overrides"].(*schema.Set)),
}
metricsSet = append(metricsSet, &metric)
}
conf.Metrics = metricsSet
return conf
}

func expandMetastoreConfig(cfg map[string]interface{}) *dataproc.MetastoreConfig {
conf := &dataproc.MetastoreConfig{}
if v, ok := cfg["dataproc_metastore_service"]; ok {
Expand Down Expand Up @@ -2171,6 +2242,7 @@ func flattenClusterConfig(d *schema.ResourceData, cfg *dataproc.ClusterConfig) (
"metastore_config": flattenMetastoreConfig(d, cfg.MetastoreConfig),
"lifecycle_config": flattenLifecycleConfig(d, cfg.LifecycleConfig),
"endpoint_config": flattenEndpointConfig(d, cfg.EndpointConfig),
"dataproc_metric_config": flattenDataprocMetricConfig(d, cfg.DataprocMetricConfig),
}

if len(cfg.InitializationActions) > 0 {
Expand Down Expand Up @@ -2277,6 +2349,26 @@ func flattenEndpointConfig(d *schema.ResourceData, ec *dataproc.EndpointConfig)
return []map[string]interface{}{data}
}

func flattenDataprocMetricConfig(d *schema.ResourceData, dmc *dataproc.DataprocMetricConfig) []map[string]interface{} {
if dmc == nil {
return nil
}

metrics := map[string]interface{}{}
metricsTypeList := schema.NewSet(schema.HashResource(metricsSchema()), []interface{}{}).List()
for _, metric := range dmc.Metrics {
data := map[string]interface{}{
"metric_source": metric.MetricSource,
"metric_overrides": metric.MetricOverrides,
}

metricsTypeList = append(metricsTypeList, &data)
}
metrics["metrics"] = metricsTypeList

return []map[string]interface{}{metrics}
}

func flattenMetastoreConfig(d *schema.ResourceData, ec *dataproc.MetastoreConfig) []map[string]interface{} {
if ec == nil {
return nil
Expand Down
48 changes: 48 additions & 0 deletions google/resource_dataproc_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,31 @@ func TestAccDataprocCluster_withReservationAffinity(t *testing.T) {
})
}

func TestAccDataprocCluster_withDataprocMetricConfig(t *testing.T) {
t.Parallel()

var cluster dataproc.Cluster
rnd := randString(t, 10)
vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataprocClusterDestroy(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocCluster_withDataprocMetricConfig(rnd),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocClusterExists(t, "google_dataproc_cluster.basic", &cluster),

resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.#", "2"),

resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source", "HDFS"),
resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides.#", "1"),
),
},
},
})
}

func TestAccDataprocCluster_withNodeGroupAffinity(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1430,6 +1455,29 @@ resource "google_dataproc_cluster" "basic" {
`, rnd, rnd)
}

func testAccDataprocCluster_withDataprocMetricConfig(rnd string) string {
return fmt.Sprintf(`
resource "google_dataproc_cluster" "basic" {
name = "tf-test-dproc-%s"
region = "us-central1"

cluster_config {
dataproc_metric_config {
metrics {
metric_source = "HDFS"
metric_overrides = ["yarn:ResourceManager:QueueMetrics:AppsCompleted"]
}

metrics {
metric_source = "SPARK"
metric_overrides = ["spark:driver:DAGScheduler:job.allJobs"]
}
}
}
}
`, rnd)
}

func testAccDataprocCluster_withNodeGroupAffinity(rnd string) string {
return fmt.Sprintf(`

Expand Down
23 changes: 23 additions & 0 deletions website/docs/r/dataproc_cluster.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ resource "google_dataproc_cluster" "accelerated_cluster" {
* `endpoint_config` (Optional) The config settings for port access on the cluster.
Structure [defined below](#nested_endpoint_config).

* `dataproc_metric_config` (Optional) The Compute Engine accelerator (GPU) configuration for these instances. Can be specified multiple times.
Structure [defined below](#nested_dataproc_metric_config).

* `metastore_config` (Optional) The config setting for metastore service with the cluster.
Structure [defined below](#nested_metastore_config).
- - -
Expand Down Expand Up @@ -792,6 +795,26 @@ cluster_config {

- - -

<a name="nested_dataproc_metric_config"></a>The `dataproc_metric_config` block supports:

```hcl
dataproc_metric_config {
metrics {
metric_source = "HDFS"
metric_overrides = ["yarn:ResourceManager:QueueMetrics:AppsCompleted"]
}
}
```


* `metrics` - (Required) Metrics sources to enable.

* `metric_source` - (Required) A source for the collection of Dataproc OSS metrics (see [available OSS metrics](https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).

* `metric_overrides` - (Optional) One or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect for the metric course.

- - -

<a name="nested_lifecycle_config"></a>The `lifecycle_config` block supports:

```hcl
Expand Down