Skip to content

Commit

Permalink
Add support for dataproc oss metric collection (#7087)
Browse files Browse the repository at this point in the history
  • Loading branch information
duqDarren authored Jan 12, 2023
1 parent b5a2fe0 commit 1752ead
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ var (
"cluster_config.0.software_config.0.optional_components",
}

dataprocMetricConfigKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics",
}

metricKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source",
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides",
}

clusterConfigKeys = []string{
"cluster_config.0.staging_bucket",
"cluster_config.0.temp_bucket",
Expand All @@ -100,6 +109,7 @@ var (
"cluster_config.0.metastore_config",
"cluster_config.0.lifecycle_config",
"cluster_config.0.endpoint_config",
"cluster_config.0.dataproc_metric_config",
}
)

Expand Down Expand Up @@ -1087,6 +1097,24 @@ by Dataproc`,
},
},
},

"dataproc_metric_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: `The config for Dataproc metrics.`,
AtLeastOneOf: clusterConfigKeys,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"metrics": {
Type: schema.TypeList,
Required: true,
Description: `Metrics sources to enable.`,
Elem: metricsSchema(),
},
},
},
},
},
},
},
Expand All @@ -1095,6 +1123,28 @@ by Dataproc`,
}
}

// We need to pull metrics' schema out so we can use it to make a set hash func
func metricsSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"metric_source": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: validation.StringInSlice([]string{"MONITORING_AGENT_DEFAULTS", "HDFS", "SPARK", "YARN", "SPARK_HISTORY_SERVER", "HIVESERVER2"}, false),
Description: `A source for the collection of Dataproc OSS metrics (see [available OSS metrics] (https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).`,
},
"metric_overrides": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
ForceNew: true,
Description: `Specify one or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect.`,
},
},
}
}

func instanceConfigSchema(parent string) *schema.Schema {
var instanceConfigKeys = []string{
"cluster_config.0." + parent + ".0.num_instances",
Expand Down Expand Up @@ -1545,6 +1595,10 @@ func expandClusterConfig(d *schema.ResourceData, config *Config) (*dataproc.Clus
conf.EndpointConfig = expandEndpointConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.dataproc_metric_config"); ok {
conf.DataprocMetricConfig = expandDataprocMetricConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.master_config"); ok {
log.Println("[INFO] got master_config")
conf.MasterConfig = expandInstanceGroupConfig(cfg)
Expand Down Expand Up @@ -1763,6 +1817,23 @@ func expandEndpointConfig(cfg map[string]interface{}) *dataproc.EndpointConfig {
return conf
}

func expandDataprocMetricConfig(cfg map[string]interface{}) *dataproc.DataprocMetricConfig {
conf := &dataproc.DataprocMetricConfig{}
metricsConfigs := cfg["metrics"].([]interface{})
metricsSet := make([]*dataproc.Metric, 0, len(metricsConfigs))

for _, raw := range metricsConfigs {
data := raw.(map[string]interface{})
metric := dataproc.Metric{
MetricSource: data["metric_source"].(string),
MetricOverrides: convertStringSet(data["metric_overrides"].(*schema.Set)),
}
metricsSet = append(metricsSet, &metric)
}
conf.Metrics = metricsSet
return conf
}

func expandMetastoreConfig(cfg map[string]interface{}) *dataproc.MetastoreConfig {
conf := &dataproc.MetastoreConfig{}
if v, ok := cfg["dataproc_metastore_service"]; ok {
Expand Down Expand Up @@ -2172,6 +2243,7 @@ func flattenClusterConfig(d *schema.ResourceData, cfg *dataproc.ClusterConfig) (
"metastore_config": flattenMetastoreConfig(d, cfg.MetastoreConfig),
"lifecycle_config": flattenLifecycleConfig(d, cfg.LifecycleConfig),
"endpoint_config": flattenEndpointConfig(d, cfg.EndpointConfig),
"dataproc_metric_config": flattenDataprocMetricConfig(d, cfg.DataprocMetricConfig),
}

if len(cfg.InitializationActions) > 0 {
Expand Down Expand Up @@ -2278,6 +2350,26 @@ func flattenEndpointConfig(d *schema.ResourceData, ec *dataproc.EndpointConfig)
return []map[string]interface{}{data}
}

func flattenDataprocMetricConfig(d *schema.ResourceData, dmc *dataproc.DataprocMetricConfig) []map[string]interface{} {
if dmc == nil {
return nil
}

metrics := map[string]interface{}{}
metricsTypeList := schema.NewSet(schema.HashResource(metricsSchema()), []interface{}{}).List()
for _, metric := range dmc.Metrics {
data := map[string]interface{}{
"metric_source": metric.MetricSource,
"metric_overrides": metric.MetricOverrides,
}

metricsTypeList = append(metricsTypeList, &data)
}
metrics["metrics"] = metricsTypeList

return []map[string]interface{}{metrics}
}

func flattenMetastoreConfig(d *schema.ResourceData, ec *dataproc.MetastoreConfig) []map[string]interface{} {
if ec == nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,31 @@ func TestAccDataprocCluster_withReservationAffinity(t *testing.T) {
})
}

func TestAccDataprocCluster_withDataprocMetricConfig(t *testing.T) {
t.Parallel()

var cluster dataproc.Cluster
rnd := randString(t, 10)
vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataprocClusterDestroy(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocCluster_withDataprocMetricConfig(rnd),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocClusterExists(t, "google_dataproc_cluster.basic", &cluster),

resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.#", "2"),

resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source", "HDFS"),
resource.TestCheckResourceAttr("google_dataproc_cluster.basic", "cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides.#", "1"),
),
},
},
})
}

func TestAccDataprocCluster_withNodeGroupAffinity(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1437,6 +1462,29 @@ resource "google_dataproc_cluster" "basic" {
`, rnd, rnd)
}

func testAccDataprocCluster_withDataprocMetricConfig(rnd string) string {
return fmt.Sprintf(`
resource "google_dataproc_cluster" "basic" {
name = "tf-test-dproc-%s"
region = "us-central1"

cluster_config {
dataproc_metric_config {
metrics {
metric_source = "HDFS"
metric_overrides = ["yarn:ResourceManager:QueueMetrics:AppsCompleted"]
}

metrics {
metric_source = "SPARK"
metric_overrides = ["spark:driver:DAGScheduler:job.allJobs"]
}
}
}
}
`, rnd)
}

func testAccDataprocCluster_withNodeGroupAffinity(rnd string) string {
return fmt.Sprintf(`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ resource "google_dataproc_cluster" "accelerated_cluster" {
* `endpoint_config` (Optional) The config settings for port access on the cluster.
Structure [defined below](#nested_endpoint_config).

* `dataproc_metric_config` (Optional) The Compute Engine accelerator (GPU) configuration for these instances. Can be specified multiple times.
Structure [defined below](#nested_dataproc_metric_config).

* `metastore_config` (Optional) The config setting for metastore service with the cluster.
Structure [defined below](#nested_metastore_config).
- - -
Expand Down Expand Up @@ -792,6 +795,26 @@ cluster_config {

- - -

<a name="nested_dataproc_metric_config"></a>The `dataproc_metric_config` block supports:

```hcl
dataproc_metric_config {
metrics {
metric_source = "HDFS"
metric_overrides = ["yarn:ResourceManager:QueueMetrics:AppsCompleted"]
}
}
```


* `metrics` - (Required) Metrics sources to enable.

* `metric_source` - (Required) A source for the collection of Dataproc OSS metrics (see [available OSS metrics](https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).

* `metric_overrides` - (Optional) One or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect for the metric course.

- - -

<a name="nested_lifecycle_config"></a>The `lifecycle_config` block supports:

```hcl
Expand Down

0 comments on commit 1752ead

Please sign in to comment.