Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dataproc oss metric collection #7087

Merged
merged 12 commits into from
Jan 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ var (
"cluster_config.0.software_config.0.optional_components",
}

dataprocMetricConfigKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics",
}

metricKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source",
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides",
}

clusterConfigKeys = []string{
"cluster_config.0.staging_bucket",
"cluster_config.0.temp_bucket",
Expand All @@ -100,6 +109,7 @@ var (
"cluster_config.0.metastore_config",
"cluster_config.0.lifecycle_config",
"cluster_config.0.endpoint_config",
"cluster_config.0.dataproc_metric_config",
}
)

Expand Down Expand Up @@ -1087,6 +1097,24 @@ by Dataproc`,
},
},
},

"dataproc_metric_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: `The config for Dataproc metrics.`,
AtLeastOneOf: clusterConfigKeys,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"metrics": {
Type: schema.TypeList,
Required: true,
Description: `Metrics sources to enable.`,
Elem: metricsSchema(),
},
},
},
},
},
},
},
Expand All @@ -1095,6 +1123,28 @@ by Dataproc`,
}
}

// We need to pull metrics' schema out so we can use it to make a set hash func
func metricsSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"metric_source": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: validation.StringInSlice([]string{"MONITORING_AGENT_DEFAULTS", "HDFS", "SPARK", "YARN", "SPARK_HISTORY_SERVER", "HIVESERVER2"}, false),
Description: `A source for the collection of Dataproc OSS metrics (see [available OSS metrics] (https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).`,
},
"metric_overrides": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
ForceNew: true,
Description: `Specify one or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect.`,
},
},
}
}

func instanceConfigSchema(parent string) *schema.Schema {
var instanceConfigKeys = []string{
"cluster_config.0." + parent + ".0.num_instances",
Expand Down Expand Up @@ -1545,6 +1595,10 @@ func expandClusterConfig(d *schema.ResourceData, config *Config) (*dataproc.Clus
conf.EndpointConfig = expandEndpointConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.dataproc_metric_config"); ok {
conf.DataprocMetricConfig = expandDataprocMetricConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.master_config"); ok {
log.Println("[INFO] got master_config")
conf.MasterConfig = expandInstanceGroupConfig(cfg)
Expand Down Expand Up @@ -1763,6 +1817,23 @@ func expandEndpointConfig(cfg map[string]interface{}) *dataproc.EndpointConfig {
return conf
}

func expandDataprocMetricConfig(cfg map[string]interface{}) *dataproc.DataprocMetricConfig {
conf := &dataproc.DataprocMetricConfig{}
metricsConfigs := cfg["metrics"].([]interface{})
metricsSet := make([]*dataproc.Metric, 0, len(metricsConfigs))

for _, raw := range metricsConfigs {
data := raw.(map[string]interface{})
metric := dataproc.Metric{
MetricSource: data["metric_source"].(string),
MetricOverrides: convertStringSet(data["metric_overrides"].(*schema.Set)),
}
metricsSet = append(metricsSet, &metric)
}
conf.Metrics = metricsSet
return conf
}

func expandMetastoreConfig(cfg map[string]interface{}) *dataproc.MetastoreConfig {
conf := &dataproc.MetastoreConfig{}
if v, ok := cfg["dataproc_metastore_service"]; ok {
Expand Down Expand Up @@ -2172,6 +2243,7 @@ func flattenClusterConfig(d *schema.ResourceData, cfg *dataproc.ClusterConfig) (
"metastore_config": flattenMetastoreConfig(d, cfg.MetastoreConfig),
"lifecycle_config": flattenLifecycleConfig(d, cfg.LifecycleConfig),
"endpoint_config": flattenEndpointConfig(d, cfg.EndpointConfig),
"dataproc_metric_config": flattenDataprocMetricConfig(d, cfg.DataprocMetricConfig),
}

if len(cfg.InitializationActions) > 0 {
Expand Down Expand Up @@ -2278,6 +2350,26 @@ func flattenEndpointConfig(d *schema.ResourceData, ec *dataproc.EndpointConfig)
return []map[string]interface{}{data}
}

func flattenDataprocMetricConfig(d *schema.ResourceData, dmc *dataproc.DataprocMetricConfig) []map[string]interface{} {
if dmc == nil {
return nil
}

metrics := map[string]interface{}{}
metricsTypeList := schema.NewSet(schema.HashResource(metricsSchema()), []interface{}{}).List()
for _, metric := range dmc.Metrics {
data := map[string]interface{}{
"metric_source": metric.MetricSource,
"metric_overrides": metric.MetricOverrides,
}

metricsTypeList = append(metricsTypeList, &data)
}
metrics["metrics"] = metricsTypeList

return []map[string]interface{}{metrics}
}

func flattenMetastoreConfig(d *schema.ResourceData, ec *dataproc.MetastoreConfig) []map[string]interface{} {
if ec == nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,31 @@ func TestAccDataprocCluster_withReservationAffinity(t *testing.T) {
})
}

func TestAccDataprocCluster_withDataprocMetricConfig(t *testing.T) {
t.Parallel()

var cluster dataproc.Cluster
rnd := randString(t, 10)
vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataprocClusterDestroy(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocCluster_withDataprocMetricConfig(rnd),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocClusterExists(t, "google_dataproc_cluster.basic", &cluster),

resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.#", "2"),

resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source", "HDFS"),
resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides.#", "1"),
),
},
},
})
}

func TestAccDataprocCluster_withNodeGroupAffinity(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1437,6 +1462,29 @@ resource "google_dataproc_cluster" "basic" {
`, rnd, rnd)
}

func testAccDataprocCluster_withDataprocMetricConfig(rnd string) string {
return fmt.Sprintf(`
resource "google_dataproc_cluster" "basic" {
name = "tf-test-dproc-%s"
region = "us-central1"

cluster_config {
dataproc_metric_config {
metrics {
metric_source = "HDFS"
metric_overrides = ["yarn:ResourceManager:QueueMetrics:AppsCompleted"]
}

metrics {
metric_source = "SPARK"
metric_overrides = ["spark:driver:DAGScheduler:job.allJobs"]
}
}
}
}
`, rnd)
}

func testAccDataprocCluster_withNodeGroupAffinity(rnd string) string {
return fmt.Sprintf(`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ resource "google_dataproc_cluster" "accelerated_cluster" {
* `endpoint_config` (Optional) The config settings for port access on the cluster.
Structure [defined below](#nested_endpoint_config).

* `dataproc_metric_config` (Optional) The Compute Engine accelerator (GPU) configuration for these instances. Can be specified multiple times.
Structure [defined below](#nested_dataproc_metric_config).

* `metastore_config` (Optional) The config setting for metastore service with the cluster.
Structure [defined below](#nested_metastore_config).
- - -
Expand Down Expand Up @@ -792,6 +795,26 @@ cluster_config {

- - -

<a name="nested_dataproc_metric_config"></a>The `dataproc_metric_config` block supports:

```hcl
dataproc_metric_config {
metrics {
metric_source = "HDFS"
metric_overrides = ["yarn:ResourceManager:QueueMetrics:AppsCompleted"]
}
}
```


* `metrics` - (Required) Metrics sources to enable.

* `metric_source` - (Required) A source for the collection of Dataproc OSS metrics (see [available OSS metrics](https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).

* `metric_overrides` - (Optional) One or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect for the metric course.

- - -

<a name="nested_lifecycle_config"></a>The `lifecycle_config` block supports:

```hcl
Expand Down