From 1392e73125a3d08869ab16379f0ebb4bc12ca365 Mon Sep 17 00:00:00 2001 From: Matt O'Hara Date: Tue, 20 Dec 2016 10:30:03 -0600 Subject: [PATCH] Add clusterstats to elasticsearch plugin (#1979) * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * responses to requested changes * remove unnecessary recommendation --- etc/telegraf.conf | 11 +- plugins/inputs/elasticsearch/README.md | 14 +- plugins/inputs/elasticsearch/elasticsearch.go | 167 ++++++++-- .../elasticsearch/elasticsearch_test.go | 140 +++++++-- plugins/inputs/elasticsearch/testdata_test.go | 292 +++++++++++++++++- plugins/parsers/json/parser.go | 33 +- 6 files changed, 582 insertions(+), 75 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index a7b903388f69b..a6058434cf648 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -784,13 +784,18 @@ # ## Timeout for HTTP requests to the elastic search server(s) # http_timeout = "5s" # -# ## set local to false when you want to read the indices stats from all nodes -# ## within the cluster +# ## When local is true (the default), the node will read only its own stats. +# ## Set local to false when you want to read the node stats from all nodes +# ## of the cluster. # local = true # -# ## set cluster_health to true when you want to also obtain cluster level stats +# ## set cluster_health to true when you want to also obtain cluster health stats # cluster_health = false # +# ## Set cluster_stats to true when you want to obtain cluster stats from the +# ## Master node. +# cluster_stats = false + # ## Optional SSL Config # # ssl_ca = "/etc/telegraf/ca.pem" # # ssl_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md index 2cf6f4d7734f6..9cf9b9b09e98e 100644 --- a/plugins/inputs/elasticsearch/README.md +++ b/plugins/inputs/elasticsearch/README.md @@ -2,7 +2,8 @@ The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain [node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html) -and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) stats. +and optionally [cluster-health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) +or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html) metrics. ### Configuration: @@ -14,13 +15,18 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference ## Timeout for HTTP requests to the elastic search server(s) http_timeout = "5s" - ## set local to false when you want to read the indices stats from all nodes - ## within the cluster + ## When local is true (the default), the node will read only its own stats. + ## Set local to false when you want to read the node stats from all nodes + ## of the cluster. local = true - ## set cluster_health to true when you want to also obtain cluster level stats + ## Set cluster_health to true when you want to also obtain cluster health stats cluster_health = false + ## Set cluster_stats to true when you want to obtain cluster stats from the + ## Master node. + cluster_stats = false + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index cce3d94ff49ad..5d5d64909a0ef 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -12,13 +12,15 @@ import ( "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" + "io/ioutil" + "strings" ) +// Nodestats are always generated, so simply define a constant for these endpoints const statsPath = "/_nodes/stats" const statsPathLocal = "/_nodes/_local/stats" -const healthPath = "/_cluster/health" -type node struct { +type nodeStat struct { Host string `json:"host"` Name string `json:"name"` Attributes map[string]string `json:"attributes"` @@ -58,6 +60,20 @@ type indexHealth struct { UnassignedShards int `json:"unassigned_shards"` } +type clusterStats struct { + NodeName string `json:"node_name"` + ClusterName string `json:"cluster_name"` + Status string `json:"status"` + Indices interface{} `json:"indices"` + Nodes interface{} `json:"nodes"` +} + +type catMaster struct { + NodeID string `json:"id"` + NodeIP string `json:"ip"` + NodeName string `json:"node"` +} + const sampleConfig = ` ## specify a list of one or more Elasticsearch servers # you can add username and password to your url to use basic authentication: @@ -67,13 +83,18 @@ const sampleConfig = ` ## Timeout for HTTP requests to the elastic search server(s) http_timeout = "5s" - ## set local to false when you want to read the indices stats from all nodes - ## within the cluster + ## When local is true (the default), the node will read only its own stats. + ## Set local to false when you want to read the node stats from all nodes + ## of the cluster. local = true - ## set cluster_health to true when you want to also obtain cluster level stats + ## Set cluster_health to true when you want to also obtain cluster health stats cluster_health = false + ## Set cluster_stats to true when you want to also obtain cluster stats from the + ## Master node. + cluster_stats = false + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -85,15 +106,18 @@ const sampleConfig = ` // Elasticsearch is a plugin to read stats from one or many Elasticsearch // servers. type Elasticsearch struct { - Local bool - Servers []string - HttpTimeout internal.Duration - ClusterHealth bool - SSLCA string `toml:"ssl_ca"` // Path to CA file - SSLCert string `toml:"ssl_cert"` // Path to host cert file - SSLKey string `toml:"ssl_key"` // Path to cert key file - InsecureSkipVerify bool // Use SSL but skip chain & host verification - client *http.Client + Local bool + Servers []string + HttpTimeout internal.Duration + ClusterHealth bool + ClusterStats bool + SSLCA string `toml:"ssl_ca"` // Path to CA file + SSLCert string `toml:"ssl_cert"` // Path to host cert file + SSLKey string `toml:"ssl_key"` // Path to cert key file + InsecureSkipVerify bool // Use SSL but skip chain & host verification + client *http.Client + catMasterResponseTokens []string + isMaster bool } // NewElasticsearch return a new instance of Elasticsearch @@ -138,12 +162,27 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } else { url = s + statsPath } + e.isMaster = false + + if e.ClusterStats { + // get cat/master information here so NodeStats can determine + // whether this node is the Master + e.setCatMaster(s + "/_cat/master") + } + + // Always gather node states if err := e.gatherNodeStats(url, acc); err != nil { errChan.C <- err return } + if e.ClusterHealth { - e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc) + url = s + "/_cluster/health?level=indices" + e.gatherClusterHealth(url, acc) + } + + if e.ClusterStats && e.isMaster { + e.gatherClusterStats(s+"/_cluster/stats", acc) } }(serv, acc) } @@ -171,12 +210,13 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) { func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { nodeStats := &struct { - ClusterName string `json:"cluster_name"` - Nodes map[string]*node `json:"nodes"` + ClusterName string `json:"cluster_name"` + Nodes map[string]*nodeStat `json:"nodes"` }{} - if err := e.gatherData(url, nodeStats); err != nil { + if err := e.gatherJsonData(url, nodeStats); err != nil { return err } + for id, n := range nodeStats.Nodes { tags := map[string]string{ "node_id": id, @@ -185,6 +225,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er "cluster_name": nodeStats.ClusterName, } + if e.ClusterStats { + // check for master + e.isMaster = (id == e.catMasterResponseTokens[0]) + } + for k, v := range n.Attributes { tags["node_attribute_"+k] = v } @@ -204,6 +249,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er now := time.Now() for p, s := range stats { f := jsonparser.JSONFlattener{} + // parse Json, ignoring strings and bools err := f.FlattenJSON("", s) if err != nil { return err @@ -214,31 +260,31 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er return nil } -func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error { - clusterStats := &clusterHealth{} - if err := e.gatherData(url, clusterStats); err != nil { +func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error { + healthStats := &clusterHealth{} + if err := e.gatherJsonData(url, healthStats); err != nil { return err } measurementTime := time.Now() clusterFields := map[string]interface{}{ - "status": clusterStats.Status, - "timed_out": clusterStats.TimedOut, - "number_of_nodes": clusterStats.NumberOfNodes, - "number_of_data_nodes": clusterStats.NumberOfDataNodes, - "active_primary_shards": clusterStats.ActivePrimaryShards, - "active_shards": clusterStats.ActiveShards, - "relocating_shards": clusterStats.RelocatingShards, - "initializing_shards": clusterStats.InitializingShards, - "unassigned_shards": clusterStats.UnassignedShards, + "status": healthStats.Status, + "timed_out": healthStats.TimedOut, + "number_of_nodes": healthStats.NumberOfNodes, + "number_of_data_nodes": healthStats.NumberOfDataNodes, + "active_primary_shards": healthStats.ActivePrimaryShards, + "active_shards": healthStats.ActiveShards, + "relocating_shards": healthStats.RelocatingShards, + "initializing_shards": healthStats.InitializingShards, + "unassigned_shards": healthStats.UnassignedShards, } acc.AddFields( "elasticsearch_cluster_health", clusterFields, - map[string]string{"name": clusterStats.ClusterName}, + map[string]string{"name": healthStats.ClusterName}, measurementTime, ) - for name, health := range clusterStats.Indices { + for name, health := range healthStats.Indices { indexFields := map[string]interface{}{ "status": health.Status, "number_of_shards": health.NumberOfShards, @@ -259,7 +305,60 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) return nil } -func (e *Elasticsearch) gatherData(url string, v interface{}) error { +func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error { + clusterStats := &clusterStats{} + if err := e.gatherJsonData(url, clusterStats); err != nil { + return err + } + now := time.Now() + tags := map[string]string{ + "node_name": clusterStats.NodeName, + "cluster_name": clusterStats.ClusterName, + "status": clusterStats.Status, + } + + stats := map[string]interface{}{ + "nodes": clusterStats.Nodes, + "indices": clusterStats.Indices, + } + + for p, s := range stats { + f := jsonparser.JSONFlattener{} + // parse json, including bools and strings + err := f.FullFlattenJSON("", s, true, true) + if err != nil { + return err + } + acc.AddFields("elasticsearch_clusterstats_"+p, f.Fields, tags, now) + } + + return nil +} + +func (e *Elasticsearch) setCatMaster(url string) error { + r, err := e.client.Get(url) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + // NOTE: we are not going to read/discard r.Body under the assumption we'd prefer + // to let the underlying transport close the connection and re-establish a new one for + // future calls. + return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK) + } + response, err := ioutil.ReadAll(r.Body) + + if err != nil { + return err + } + + e.catMasterResponseTokens = strings.Split(string(response), " ") + + return nil +} + +func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error { r, err := e.client.Get(url) if err != nil { return err @@ -272,9 +371,11 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error { return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) } + if err = json.NewDecoder(r.Body).Decode(v); err != nil { return err } + return nil } diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 760ac921beecc..59caa4306b1e9 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -8,6 +8,8 @@ import ( "github.com/influxdata/telegraf/testutil" + "fmt" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -37,43 +39,70 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) { func (t *transportMock) CancelRequest(_ *http.Request) { } -func TestElasticsearch(t *testing.T) { +func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) { + if es.isMaster != expected { + msg := fmt.Sprintf("IsMaster set incorrectly") + assert.Fail(t, msg) + } +} +func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) { + tags := map[string]string{ + "cluster_name": "es-testcluster", + "node_attribute_master": "true", + "node_id": "SDFsfSDFsdfFSDSDfSFDSDF", + "node_name": "test.host.com", + "node_host": "test", + } + + acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags) +} + +func TestGather(t *testing.T) { es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} - es.client.Transport = newTransportMock(http.StatusOK, statsResponse) + es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) var acc testutil.Accumulator if err := es.Gather(&acc); err != nil { t.Fatal(err) } - tags := map[string]string{ - "cluster_name": "es-testcluster", - "node_attribute_master": "true", - "node_id": "SDFsfSDFsdfFSDSDfSFDSDF", - "node_name": "test.host.com", - "node_host": "test", + checkIsMaster(es, false, t) + checkNodeStatsResult(t, &acc) +} + +func TestGatherNodeStats(t *testing.T) { + es := newElasticsearchWithClient() + es.Servers = []string{"http://example.com:9200"} + es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) + + var acc testutil.Accumulator + if err := es.gatherNodeStats("junk", &acc); err != nil { + t.Fatal(err) } - acc.AssertContainsTaggedFields(t, "elasticsearch_indices", indicesExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_os", osExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_process", processExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", jvmExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", threadPoolExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_fs", fsExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_transport", transportExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_http", httpExpected, tags) - acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", breakersExpected, tags) + checkIsMaster(es, false, t) + checkNodeStatsResult(t, &acc) } -func TestGatherClusterStats(t *testing.T) { +func TestGatherClusterHealth(t *testing.T) { es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} es.ClusterHealth = true - es.client.Transport = newTransportMock(http.StatusOK, clusterResponse) + es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse) var acc testutil.Accumulator - require.NoError(t, es.Gather(&acc)) + require.NoError(t, es.gatherClusterHealth("junk", &acc)) + + checkIsMaster(es, false, t) acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health", clusterHealthExpected, @@ -88,6 +117,77 @@ func TestGatherClusterStats(t *testing.T) { map[string]string{"index": "v2"}) } +func TestGatherClusterStatsMaster(t *testing.T) { + // This needs multiple steps to replicate the multiple calls internally. + es := newElasticsearchWithClient() + es.ClusterStats = true + es.Servers = []string{"http://example.com:9200"} + + // first get catMaster + es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult) + require.NoError(t, es.setCatMaster("junk")) + + IsMasterResultTokens := strings.Split(string(IsMasterResult), " ") + if es.catMasterResponseTokens[0] != IsMasterResultTokens[0] { + msg := fmt.Sprintf("catmaster is incorrect") + assert.Fail(t, msg) + } + + // now get node status, which determines whether we're master + var acc testutil.Accumulator + es.Local = true + es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) + if err := es.gatherNodeStats("junk", &acc); err != nil { + t.Fatal(err) + } + + checkIsMaster(es, true, t) + checkNodeStatsResult(t, &acc) + + // now test the clusterstats method + es.client.Transport = newTransportMock(http.StatusOK, clusterStatsResponse) + require.NoError(t, es.gatherClusterStats("junk", &acc)) + + tags := map[string]string{ + "cluster_name": "es-testcluster", + "node_name": "test.host.com", + "status": "red", + } + + acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags) +} + +func TestGatherClusterStatsNonMaster(t *testing.T) { + // This needs multiple steps to replicate the multiple calls internally. + es := newElasticsearchWithClient() + es.ClusterStats = true + es.Servers = []string{"http://example.com:9200"} + + // first get catMaster + es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult) + require.NoError(t, es.setCatMaster("junk")) + + IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ") + if es.catMasterResponseTokens[0] != IsNotMasterResultTokens[0] { + msg := fmt.Sprintf("catmaster is incorrect") + assert.Fail(t, msg) + } + + // now get node status, which determines whether we're master + var acc testutil.Accumulator + es.Local = true + es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) + if err := es.gatherNodeStats("junk", &acc); err != nil { + t.Fatal(err) + } + + // ensure flag is clear so Cluster Stats would not be done + checkIsMaster(es, false, t) + checkNodeStatsResult(t, &acc) + +} + func newElasticsearchWithClient() *Elasticsearch { es := NewElasticsearch() es.client = &http.Client{} diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index bca1f9e45beff..19ebb3bfb48e8 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -1,6 +1,6 @@ package elasticsearch -const clusterResponse = ` +const clusterHealthResponse = ` { "cluster_name": "elasticsearch_telegraf", "status": "green", @@ -71,7 +71,7 @@ var v2IndexExpected = map[string]interface{}{ "unassigned_shards": 20, } -const statsResponse = ` +const nodeStatsResponse = ` { "cluster_name": "es-testcluster", "nodes": { @@ -489,7 +489,7 @@ const statsResponse = ` } ` -var indicesExpected = map[string]interface{}{ +var nodestatsIndicesExpected = map[string]interface{}{ "id_cache_memory_size_in_bytes": float64(0), "completion_size_in_bytes": float64(0), "suggest_total": float64(0), @@ -561,7 +561,7 @@ var indicesExpected = map[string]interface{}{ "segments_fixed_bit_set_memory_in_bytes": float64(0), } -var osExpected = map[string]interface{}{ +var nodestatsOsExpected = map[string]interface{}{ "load_average_0": float64(0.01), "load_average_1": float64(0.04), "load_average_2": float64(0.05), @@ -576,7 +576,7 @@ var osExpected = map[string]interface{}{ "mem_used_in_bytes": float64(1621868544), } -var processExpected = map[string]interface{}{ +var nodestatsProcessExpected = map[string]interface{}{ "mem_total_virtual_in_bytes": float64(4747890688), "timestamp": float64(1436460392945), "open_file_descriptors": float64(160), @@ -586,7 +586,7 @@ var processExpected = map[string]interface{}{ "cpu_user_in_millis": float64(13610), } -var jvmExpected = map[string]interface{}{ +var nodestatsJvmExpected = map[string]interface{}{ "timestamp": float64(1436460392945), "uptime_in_millis": float64(202245), "mem_non_heap_used_in_bytes": float64(39634576), @@ -621,7 +621,7 @@ var jvmExpected = map[string]interface{}{ "buffer_pools_mapped_total_capacity_in_bytes": float64(0), } -var threadPoolExpected = map[string]interface{}{ +var nodestatsThreadPoolExpected = map[string]interface{}{ "merge_threads": float64(6), "merge_queue": float64(4), "merge_active": float64(5), @@ -726,7 +726,7 @@ var threadPoolExpected = map[string]interface{}{ "flush_completed": float64(3), } -var fsExpected = map[string]interface{}{ +var nodestatsFsExpected = map[string]interface{}{ "data_0_total_in_bytes": float64(19507089408), "data_0_free_in_bytes": float64(16909316096), "data_0_available_in_bytes": float64(15894814720), @@ -736,7 +736,7 @@ var fsExpected = map[string]interface{}{ "total_total_in_bytes": float64(19507089408), } -var transportExpected = map[string]interface{}{ +var nodestatsTransportExpected = map[string]interface{}{ "server_open": float64(13), "rx_count": float64(6), "rx_size_in_bytes": float64(1380), @@ -744,12 +744,12 @@ var transportExpected = map[string]interface{}{ "tx_size_in_bytes": float64(1380), } -var httpExpected = map[string]interface{}{ +var nodestatsHttpExpected = map[string]interface{}{ "current_open": float64(3), "total_opened": float64(3), } -var breakersExpected = map[string]interface{}{ +var nodestatsBreakersExpected = map[string]interface{}{ "fielddata_estimated_size_in_bytes": float64(0), "fielddata_overhead": float64(1.03), "fielddata_tripped": float64(0), @@ -763,3 +763,273 @@ var breakersExpected = map[string]interface{}{ "parent_limit_size_in_bytes": float64(727213670), "parent_estimated_size_in_bytes": float64(0), } + +const clusterStatsResponse = ` +{ + "host":"ip-10-0-1-214", + "log_type":"metrics", + "timestamp":1475767451229, + "log_level":"INFO", + "node_name":"test.host.com", + "cluster_name":"es-testcluster", + "status":"red", + "indices":{ + "count":1, + "shards":{ + "total":4, + "primaries":4, + "replication":0.0, + "index":{ + "shards":{ + "min":4, + "max":4, + "avg":4.0 + }, + "primaries":{ + "min":4, + "max":4, + "avg":4.0 + }, + "replication":{ + "min":0.0, + "max":0.0, + "avg":0.0 + } + } + }, + "docs":{ + "count":4, + "deleted":0 + }, + "store":{ + "size_in_bytes":17084, + "throttle_time_in_millis":0 + }, + "fielddata":{ + "memory_size_in_bytes":0, + "evictions":0 + }, + "query_cache":{ + "memory_size_in_bytes":0, + "total_count":0, + "hit_count":0, + "miss_count":0, + "cache_size":0, + "cache_count":0, + "evictions":0 + }, + "completion":{ + "size_in_bytes":0 + }, + "segments":{ + "count":4, + "memory_in_bytes":11828, + "terms_memory_in_bytes":8932, + "stored_fields_memory_in_bytes":1248, + "term_vectors_memory_in_bytes":0, + "norms_memory_in_bytes":1280, + "doc_values_memory_in_bytes":368, + "index_writer_memory_in_bytes":0, + "index_writer_max_memory_in_bytes":2048000, + "version_map_memory_in_bytes":0, + "fixed_bit_set_memory_in_bytes":0 + }, + "percolate":{ + "total":0, + "time_in_millis":0, + "current":0, + "memory_size_in_bytes":-1, + "memory_size":"-1b", + "queries":0 + } + }, + "nodes":{ + "count":{ + "total":1, + "master_only":0, + "data_only":0, + "master_data":1, + "client":0 + }, + "versions":[ + { + "version": "2.3.3" + } + ], + "os":{ + "available_processors":1, + "allocated_processors":1, + "mem":{ + "total_in_bytes":593301504 + }, + "names":[ + { + "name":"Linux", + "count":1 + } + ] + }, + "process":{ + "cpu":{ + "percent":0 + }, + "open_file_descriptors":{ + "min":145, + "max":145, + "avg":145 + } + }, + "jvm":{ + "max_uptime_in_millis":11580527, + "versions":[ + { + "version":"1.8.0_101", + "vm_name":"OpenJDK 64-Bit Server VM", + "vm_version":"25.101-b13", + "vm_vendor":"Oracle Corporation", + "count":1 + } + ], + "mem":{ + "heap_used_in_bytes":70550288, + "heap_max_in_bytes":1065025536 + }, + "threads":30 + }, + "fs":{ + "total_in_bytes":8318783488, + "free_in_bytes":6447439872, + "available_in_bytes":6344785920 + }, + "plugins":[ + { + "name":"cloud-aws", + "version":"2.3.3", + "description":"The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.", + "jvm":true, + "classname":"org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin", + "isolated":true, + "site":false + }, + { + "name":"kopf", + "version":"2.0.1", + "description":"kopf - simple web administration tool for Elasticsearch", + "url":"/_plugin/kopf/", + "jvm":false, + "site":true + }, + { + "name":"tr-metrics", + "version":"7bd5b4b", + "description":"Logs cluster and node stats for performance monitoring.", + "jvm":true, + "classname":"com.trgr.elasticsearch.plugin.metrics.MetricsPlugin", + "isolated":true, + "site":false + } + ] + } +} +` + +var clusterstatsIndicesExpected = map[string]interface{}{ + "completion_size_in_bytes": float64(0), + "count": float64(1), + "docs_count": float64(4), + "docs_deleted": float64(0), + "fielddata_evictions": float64(0), + "fielddata_memory_size_in_bytes": float64(0), + "percolate_current": float64(0), + "percolate_memory_size_in_bytes": float64(-1), + "percolate_queries": float64(0), + "percolate_time_in_millis": float64(0), + "percolate_total": float64(0), + "percolate_memory_size": "-1b", + "query_cache_cache_count": float64(0), + "query_cache_cache_size": float64(0), + "query_cache_evictions": float64(0), + "query_cache_hit_count": float64(0), + "query_cache_memory_size_in_bytes": float64(0), + "query_cache_miss_count": float64(0), + "query_cache_total_count": float64(0), + "segments_count": float64(4), + "segments_doc_values_memory_in_bytes": float64(368), + "segments_fixed_bit_set_memory_in_bytes": float64(0), + "segments_index_writer_max_memory_in_bytes": float64(2.048e+06), + "segments_index_writer_memory_in_bytes": float64(0), + "segments_memory_in_bytes": float64(11828), + "segments_norms_memory_in_bytes": float64(1280), + "segments_stored_fields_memory_in_bytes": float64(1248), + "segments_term_vectors_memory_in_bytes": float64(0), + "segments_terms_memory_in_bytes": float64(8932), + "segments_version_map_memory_in_bytes": float64(0), + "shards_index_primaries_avg": float64(4), + "shards_index_primaries_max": float64(4), + "shards_index_primaries_min": float64(4), + "shards_index_replication_avg": float64(0), + "shards_index_replication_max": float64(0), + "shards_index_replication_min": float64(0), + "shards_index_shards_avg": float64(4), + "shards_index_shards_max": float64(4), + "shards_index_shards_min": float64(4), + "shards_primaries": float64(4), + "shards_replication": float64(0), + "shards_total": float64(4), + "store_size_in_bytes": float64(17084), + "store_throttle_time_in_millis": float64(0), +} + +var clusterstatsNodesExpected = map[string]interface{}{ + "count_client": float64(0), + "count_data_only": float64(0), + "count_master_data": float64(1), + "count_master_only": float64(0), + "count_total": float64(1), + "fs_available_in_bytes": float64(6.34478592e+09), + "fs_free_in_bytes": float64(6.447439872e+09), + "fs_total_in_bytes": float64(8.318783488e+09), + "jvm_max_uptime_in_millis": float64(1.1580527e+07), + "jvm_mem_heap_max_in_bytes": float64(1.065025536e+09), + "jvm_mem_heap_used_in_bytes": float64(7.0550288e+07), + "jvm_threads": float64(30), + "jvm_versions_0_count": float64(1), + "jvm_versions_0_version": "1.8.0_101", + "jvm_versions_0_vm_name": "OpenJDK 64-Bit Server VM", + "jvm_versions_0_vm_vendor": "Oracle Corporation", + "jvm_versions_0_vm_version": "25.101-b13", + "os_allocated_processors": float64(1), + "os_available_processors": float64(1), + "os_mem_total_in_bytes": float64(5.93301504e+08), + "os_names_0_count": float64(1), + "os_names_0_name": "Linux", + "process_cpu_percent": float64(0), + "process_open_file_descriptors_avg": float64(145), + "process_open_file_descriptors_max": float64(145), + "process_open_file_descriptors_min": float64(145), + "versions_0_version": "2.3.3", + "plugins_0_classname": "org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin", + "plugins_0_description": "The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.", + "plugins_0_isolated": true, + "plugins_0_jvm": true, + "plugins_0_name": "cloud-aws", + "plugins_0_site": false, + "plugins_0_version": "2.3.3", + "plugins_1_description": "kopf - simple web administration tool for Elasticsearch", + "plugins_1_jvm": false, + "plugins_1_name": "kopf", + "plugins_1_site": true, + "plugins_1_url": "/_plugin/kopf/", + "plugins_1_version": "2.0.1", + "plugins_2_classname": "com.trgr.elasticsearch.plugin.metrics.MetricsPlugin", + "plugins_2_description": "Logs cluster and node stats for performance monitoring.", + "plugins_2_isolated": true, + "plugins_2_jvm": true, + "plugins_2_name": "tr-metrics", + "plugins_2_site": false, + "plugins_2_version": "7bd5b4b", +} + +const IsMasterResult = "SDFsfSDFsdfFSDSDfSFDSDF 10.206.124.66 10.206.124.66 test.host.com " + +const IsNotMasterResult = "junk 10.206.124.66 10.206.124.66 test.junk.com " diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index edd5afc546a25..a8743558d43c6 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -103,10 +103,22 @@ type JSONFlattener struct { Fields map[string]interface{} } -// FlattenJSON flattens nested maps/interfaces into a fields map +// FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string) func (f *JSONFlattener) FlattenJSON( + fieldname string, + v interface{}) error { + if f.Fields == nil { + f.Fields = make(map[string]interface{}) + } + return f.FullFlattenJSON(fieldname, v, false, false) +} + +// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string) +func (f *JSONFlattener) FullFlattenJSON( fieldname string, v interface{}, + convertString bool, + convertBool bool, ) error { if f.Fields == nil { f.Fields = make(map[string]interface{}) @@ -115,7 +127,7 @@ func (f *JSONFlattener) FlattenJSON( switch t := v.(type) { case map[string]interface{}: for k, v := range t { - err := f.FlattenJSON(fieldname+"_"+k+"_", v) + err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool) if err != nil { return err } @@ -123,15 +135,28 @@ func (f *JSONFlattener) FlattenJSON( case []interface{}: for i, v := range t { k := strconv.Itoa(i) - err := f.FlattenJSON(fieldname+"_"+k+"_", v) + err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool) if err != nil { return nil } } case float64: f.Fields[fieldname] = t - case bool, string, nil: + case string: + if convertString { + f.Fields[fieldname] = v.(string) + } else { + return nil + } + case bool: + if convertBool { + f.Fields[fieldname] = v.(bool) + } else { + return nil + } + case nil: // ignored types + fmt.Println("json parser ignoring " + fieldname) return nil default: return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",