Skip to content

Commit

Permalink
Add clusterstats to elasticsearch plugin (#1979)
Browse files Browse the repository at this point in the history
* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* responses to requested changes

* remove unnecessary recommendation
  • Loading branch information
Matt O'Hara authored and sparrc committed Dec 20, 2016
1 parent a90afd9 commit 1392e73
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 75 deletions.
11 changes: 8 additions & 3 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -784,13 +784,18 @@
# ## Timeout for HTTP requests to the elastic search server(s)
# http_timeout = "5s"
#
# ## set local to false when you want to read the indices stats from all nodes
# ## within the cluster
# ## When local is true (the default), the node will read only its own stats.
# ## Set local to false when you want to read the node stats from all nodes
# ## of the cluster.
# local = true
#
# ## set cluster_health to true when you want to also obtain cluster level stats
# ## set cluster_health to true when you want to also obtain cluster health stats
# cluster_health = false
#
# ## Set cluster_stats to true when you want to obtain cluster stats from the
# ## Master node.
# cluster_stats = false

# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
Expand Down
14 changes: 10 additions & 4 deletions plugins/inputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain
[node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) stats.
and optionally [cluster-health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html)
or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html) metrics.

### Configuration:

Expand All @@ -14,13 +15,18 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference
## Timeout for HTTP requests to the elastic search server(s)
http_timeout = "5s"
## set local to false when you want to read the indices stats from all nodes
## within the cluster
## When local is true (the default), the node will read only its own stats.
## Set local to false when you want to read the node stats from all nodes
## of the cluster.
local = true
## set cluster_health to true when you want to also obtain cluster level stats
## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false
## Set cluster_stats to true when you want to obtain cluster stats from the
## Master node.
cluster_stats = false
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand Down
167 changes: 134 additions & 33 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"io/ioutil"
"strings"
)

// Nodestats are always generated, so simply define a constant for these endpoints
const statsPath = "/_nodes/stats"
const statsPathLocal = "/_nodes/_local/stats"
const healthPath = "/_cluster/health"

type node struct {
type nodeStat struct {
Host string `json:"host"`
Name string `json:"name"`
Attributes map[string]string `json:"attributes"`
Expand Down Expand Up @@ -58,6 +60,20 @@ type indexHealth struct {
UnassignedShards int `json:"unassigned_shards"`
}

type clusterStats struct {
NodeName string `json:"node_name"`
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
Indices interface{} `json:"indices"`
Nodes interface{} `json:"nodes"`
}

type catMaster struct {
NodeID string `json:"id"`
NodeIP string `json:"ip"`
NodeName string `json:"node"`
}

const sampleConfig = `
## specify a list of one or more Elasticsearch servers
# you can add username and password to your url to use basic authentication:
Expand All @@ -67,13 +83,18 @@ const sampleConfig = `
## Timeout for HTTP requests to the elastic search server(s)
http_timeout = "5s"
## set local to false when you want to read the indices stats from all nodes
## within the cluster
## When local is true (the default), the node will read only its own stats.
## Set local to false when you want to read the node stats from all nodes
## of the cluster.
local = true
## set cluster_health to true when you want to also obtain cluster level stats
## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false
## Set cluster_stats to true when you want to also obtain cluster stats from the
## Master node.
cluster_stats = false
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand All @@ -85,15 +106,18 @@ const sampleConfig = `
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers.
type Elasticsearch struct {
Local bool
Servers []string
HttpTimeout internal.Duration
ClusterHealth bool
SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLCert string `toml:"ssl_cert"` // Path to host cert file
SSLKey string `toml:"ssl_key"` // Path to cert key file
InsecureSkipVerify bool // Use SSL but skip chain & host verification
client *http.Client
Local bool
Servers []string
HttpTimeout internal.Duration
ClusterHealth bool
ClusterStats bool
SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLCert string `toml:"ssl_cert"` // Path to host cert file
SSLKey string `toml:"ssl_key"` // Path to cert key file
InsecureSkipVerify bool // Use SSL but skip chain & host verification
client *http.Client
catMasterResponseTokens []string
isMaster bool
}

// NewElasticsearch return a new instance of Elasticsearch
Expand Down Expand Up @@ -138,12 +162,27 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
} else {
url = s + statsPath
}
e.isMaster = false

if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
e.setCatMaster(s + "/_cat/master")
}

// Always gather node states
if err := e.gatherNodeStats(url, acc); err != nil {
errChan.C <- err
return
}

if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
url = s + "/_cluster/health?level=indices"
e.gatherClusterHealth(url, acc)
}

if e.ClusterStats && e.isMaster {
e.gatherClusterStats(s+"/_cluster/stats", acc)
}
}(serv, acc)
}
Expand Down Expand Up @@ -171,12 +210,13 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"`
ClusterName string `json:"cluster_name"`
Nodes map[string]*nodeStat `json:"nodes"`
}{}
if err := e.gatherData(url, nodeStats); err != nil {
if err := e.gatherJsonData(url, nodeStats); err != nil {
return err
}

for id, n := range nodeStats.Nodes {
tags := map[string]string{
"node_id": id,
Expand All @@ -185,6 +225,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
"cluster_name": nodeStats.ClusterName,
}

if e.ClusterStats {
// check for master
e.isMaster = (id == e.catMasterResponseTokens[0])
}

for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
Expand All @@ -204,6 +249,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
now := time.Now()
for p, s := range stats {
f := jsonparser.JSONFlattener{}
// parse Json, ignoring strings and bools
err := f.FlattenJSON("", s)
if err != nil {
return err
Expand All @@ -214,31 +260,31 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
return nil
}

func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterHealth{}
if err := e.gatherData(url, clusterStats); err != nil {
func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error {
healthStats := &clusterHealth{}
if err := e.gatherJsonData(url, healthStats); err != nil {
return err
}
measurementTime := time.Now()
clusterFields := map[string]interface{}{
"status": clusterStats.Status,
"timed_out": clusterStats.TimedOut,
"number_of_nodes": clusterStats.NumberOfNodes,
"number_of_data_nodes": clusterStats.NumberOfDataNodes,
"active_primary_shards": clusterStats.ActivePrimaryShards,
"active_shards": clusterStats.ActiveShards,
"relocating_shards": clusterStats.RelocatingShards,
"initializing_shards": clusterStats.InitializingShards,
"unassigned_shards": clusterStats.UnassignedShards,
"status": healthStats.Status,
"timed_out": healthStats.TimedOut,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"active_primary_shards": healthStats.ActivePrimaryShards,
"active_shards": healthStats.ActiveShards,
"relocating_shards": healthStats.RelocatingShards,
"initializing_shards": healthStats.InitializingShards,
"unassigned_shards": healthStats.UnassignedShards,
}
acc.AddFields(
"elasticsearch_cluster_health",
clusterFields,
map[string]string{"name": clusterStats.ClusterName},
map[string]string{"name": healthStats.ClusterName},
measurementTime,
)

for name, health := range clusterStats.Indices {
for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
"status": health.Status,
"number_of_shards": health.NumberOfShards,
Expand All @@ -259,7 +305,60 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
return nil
}

func (e *Elasticsearch) gatherData(url string, v interface{}) error {
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterStats{}
if err := e.gatherJsonData(url, clusterStats); err != nil {
return err
}
now := time.Now()
tags := map[string]string{
"node_name": clusterStats.NodeName,
"cluster_name": clusterStats.ClusterName,
"status": clusterStats.Status,
}

stats := map[string]interface{}{
"nodes": clusterStats.Nodes,
"indices": clusterStats.Indices,
}

for p, s := range stats {
f := jsonparser.JSONFlattener{}
// parse json, including bools and strings
err := f.FullFlattenJSON("", s, true, true)
if err != nil {
return err
}
acc.AddFields("elasticsearch_clusterstats_"+p, f.Fields, tags, now)
}

return nil
}

func (e *Elasticsearch) setCatMaster(url string) error {
r, err := e.client.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
response, err := ioutil.ReadAll(r.Body)

if err != nil {
return err
}

e.catMasterResponseTokens = strings.Split(string(response), " ")

return nil
}

func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
r, err := e.client.Get(url)
if err != nil {
return err
Expand All @@ -272,9 +371,11 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
r.StatusCode, http.StatusOK)
}

if err = json.NewDecoder(r.Body).Decode(v); err != nil {
return err
}

return nil
}

Expand Down
Loading

0 comments on commit 1392e73

Please sign in to comment.