diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 88aa5be4941f2..c826fd0e015ab 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -58,6 +58,19 @@ in Prometheus format. # field selector to target pods # eg. To scrape pods on a specific node # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + + ## Scrape Services available in Consul Catalog + # [inputs.prometheus.consul] + # enabled = true + # agent = "http://localhost:8500" + # query_interval = "5m" + + # [[inputs.prometheus.consul.query]] + # name = "a service name" + # tag = "a service tag" + # url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}' + # [inputs.prometheus.consul.query.tags] + # host = "{{.Node}}" ## Use bearer token for authorization. ('bearer_token' takes priority) # bearer_token = "/path/to/bearer/token" @@ -117,6 +130,26 @@ env: If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds. +#### Consul Service Discovery + +Enabling this option and configuring consul `agent` url will allow the plugin to query +consul catalog for available services. Using `query_interval` the plugin will periodically +query the consul catalog for services with `name` and `tag` and refresh the list of scraped urls. +It can use the information from the catalog to build the scraped url and additional tags from a template. + +Multiple consul queries can be configured, each for different service. +The following example fields can be used in url or tag templates: +* Node +* Address +* NodeMeta +* ServicePort +* ServiceAddress +* ServiceTags +* ServiceMeta + +For full list of available fields and their type see struct CatalogService in +https://github.com/hashicorp/consul/blob/master/api/catalog.go + #### Bearer Token If set, the file specified by the `bearer_token` parameter will be read on diff --git a/plugins/inputs/prometheus/consul.go b/plugins/inputs/prometheus/consul.go new file mode 100644 index 0000000000000..2f008a495c09b --- /dev/null +++ b/plugins/inputs/prometheus/consul.go @@ -0,0 +1,208 @@ +package prometheus + +import ( + "bytes" + "context" + "fmt" + "net/url" + "strings" + "text/template" + "time" + + "github.com/hashicorp/consul/api" + "github.com/influxdata/telegraf/config" +) + +type ConsulConfig struct { + // Address of the Consul agent. The address must contain a hostname or an IP address + // and optionally a port (format: "host:port"). + Enabled bool `toml:"enabled"` + Agent string `toml:"agent"` + QueryInterval config.Duration `toml:"query_interval"` + Queries []*ConsulQuery `toml:"query"` +} + +// One Consul service discovery query +type ConsulQuery struct { + // A name of the searched services (not ID) + ServiceName string `toml:"name"` + + // A tag of the searched services + ServiceTag string `toml:"tag"` + + // A DC of the searched services + ServiceDc string `toml:"dc"` + + // A template URL of the Prometheus gathering interface. The hostname part + // of the URL will be replaced by discovered address and port. + ServiceURL string `toml:"url"` + + // Extra tags to add to metrics found in Consul + ServiceExtraTags map[string]string `toml:"tags"` + + serviceURLTemplate *template.Template + serviceExtraTagsTemplate map[string]*template.Template + + // Store last error status and change log level depending on repeated occurence + lastQueryFailed bool +} + +func (p *Prometheus) startConsul(ctx context.Context) error { + consulAPIConfig := api.DefaultConfig() + if p.ConsulConfig.Agent != "" { + consulAPIConfig.Address = p.ConsulConfig.Agent + } + + consul, err := api.NewClient(consulAPIConfig) + if err != nil { + return fmt.Errorf("cannot connect to the Consul agent: %v", err) + } + + // Parse the template for metrics URL, drop queries with template parse errors + i := 0 + for _, q := range p.ConsulConfig.Queries { + serviceURLTemplate, err := template.New("URL").Parse(q.ServiceURL) + if err != nil { + p.Log.Errorf("Could not parse the Consul query URL template (%s), skipping it. Error: %s", q.ServiceURL, err) + continue + } + q.serviceURLTemplate = serviceURLTemplate + + // Allow to use join function in tags + templateFunctions := template.FuncMap{"join": strings.Join} + // Parse the tag value templates + q.serviceExtraTagsTemplate = make(map[string]*template.Template) + for tagName, tagTemplateString := range q.ServiceExtraTags { + tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString) + if err != nil { + p.Log.Errorf("Could not parse the Consul query Extra Tag template (%s), skipping it. Error: %s", tagTemplateString, err) + continue + } + q.serviceExtraTagsTemplate[tagName] = tagTemplate + } + p.ConsulConfig.Queries[i] = q + i++ + } + // Prevent memory leak by erasing truncated values + for j := i; j < len(p.ConsulConfig.Queries); j++ { + p.ConsulConfig.Queries[j] = nil + } + p.ConsulConfig.Queries = p.ConsulConfig.Queries[:i] + + catalog := consul.Catalog() + + p.wg.Add(1) + go func() { + // Store last error status and change log level depending on repeated occurence + var refreshFailed = false + defer p.wg.Done() + err := p.refreshConsulServices(catalog) + if err != nil { + refreshFailed = true + p.Log.Errorf("Unable to refreh Consul services: %v", err) + } + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(p.ConsulConfig.QueryInterval)): + err := p.refreshConsulServices(catalog) + if err != nil { + message := fmt.Sprintf("Unable to refreh Consul services: %v", err) + if refreshFailed { + p.Log.Debug(message) + } else { + p.Log.Warn(message) + } + refreshFailed = true + } else if refreshFailed { + refreshFailed = false + p.Log.Info("Successfully refreshed Consul services after previous errors") + } + } + } + }() + + return nil +} + +func (p *Prometheus) refreshConsulServices(c *api.Catalog) error { + consulServiceURLs := make(map[string]URLAndAddress) + + p.Log.Debugf("Refreshing Consul services") + + for _, q := range p.ConsulConfig.Queries { + queryOptions := api.QueryOptions{} + if q.ServiceDc != "" { + queryOptions.Datacenter = q.ServiceDc + } + + // Request services from Consul + consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions) + if err != nil { + return err + } + if len(consulServices) == 0 { + p.Log.Debugf("Queried Consul for Service (%s, %s) but did not find any instances", q.ServiceName, q.ServiceTag) + continue + } + p.Log.Debugf("Queried Consul for Service (%s, %s) and found %d instances", q.ServiceName, q.ServiceTag, len(consulServices)) + + for _, consulService := range consulServices { + uaa, err := p.getConsulServiceURL(q, consulService) + if err != nil { + message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err) + if q.lastQueryFailed { + p.Log.Debug(message) + } else { + p.Log.Warn(message) + } + q.lastQueryFailed = true + break + } + if q.lastQueryFailed { + p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag) + } + q.lastQueryFailed = false + p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String()) + consulServiceURLs[uaa.URL.String()] = *uaa + } + } + + p.lock.Lock() + p.consulServices = consulServiceURLs + p.lock.Unlock() + + return nil +} + +func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) { + var buffer bytes.Buffer + buffer.Reset() + err := q.serviceURLTemplate.Execute(&buffer, s) + if err != nil { + return nil, err + } + serviceURL, err := url.Parse(buffer.String()) + if err != nil { + return nil, err + } + + extraTags := make(map[string]string) + for tagName, tagTemplate := range q.serviceExtraTagsTemplate { + buffer.Reset() + err = tagTemplate.Execute(&buffer, s) + if err != nil { + return nil, err + } + extraTags[tagName] = buffer.String() + } + + p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String()) + + return &URLAndAddress{ + URL: serviceURL, + OriginalURL: serviceURL, + Tags: extraTags, + }, nil +} diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 7a85d88e2c59b..2e87939e372c1 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -12,7 +12,6 @@ import ( "net/url" "os/user" "path/filepath" - "sync" "time" "github.com/ghodss/yaml" @@ -55,7 +54,7 @@ func loadClient(kubeconfigPath string) (*kubernetes.Clientset, error) { return kubernetes.NewForConfig(&config) } -func (p *Prometheus) start(ctx context.Context) error { +func (p *Prometheus) startK8s(ctx context.Context) error { config, err := rest.InClusterConfig() if err != nil { return fmt.Errorf("failed to get InClusterConfig - %v", err) @@ -77,8 +76,6 @@ func (p *Prometheus) start(ctx context.Context) error { } } - p.wg = sync.WaitGroup{} - p.wg.Add(1) go func() { defer p.wg.Done() diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 4a3b71408c552..adeb452253a37 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -41,6 +41,9 @@ type Prometheus struct { // Field Selector/s for Kubernetes KubernetesFieldSelector string `toml:"kubernetes_field_selector"` + // Consul SD configuration + ConsulConfig ConsulConfig `toml:"consul"` + // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` BearerTokenString string `toml:"bearer_token_string"` @@ -77,6 +80,9 @@ type Prometheus struct { podLabelSelector labels.Selector podFieldSelector fields.Selector isNodeScrapeScope bool + + // List of consul services to scrape + consulServices map[string]URLAndAddress } var sampleConfig = ` @@ -127,6 +133,19 @@ var sampleConfig = ` # eg. To scrape pods on a specific node # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + ## Scrape Services available in Consul Catalog + # [inputs.prometheus.consul] + # enabled = true + # agent = "http://localhost:8500" + # query_interval = "5m" + + # [[inputs.prometheus.consul.query]] + # name = "a service name" + # tag = "a service tag" + # url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}' + # [inputs.prometheus.consul.query.tags] + # host = "{{.Node}}" + ## Use bearer token for authorization. ('bearer_token' takes priority) # bearer_token = "/path/to/bearer/token" ## OR @@ -238,6 +257,10 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { p.lock.Lock() defer p.lock.Unlock() + // add all services collected from consul + for k, v := range p.consulServices { + allURLs[k] = v + } // loop through all pods scraped via the prometheus annotation on the pods for k, v := range p.kubernetesPods { allURLs[k] = v @@ -463,20 +486,27 @@ func fieldSelectorIsSupported(fieldSelector fields.Selector) (bool, string) { return true, "" } -// Start will start the Kubernetes scraping if enabled in the configuration +// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration func (p *Prometheus) Start(_ telegraf.Accumulator) error { + var ctx context.Context + p.wg = sync.WaitGroup{} + ctx, p.cancel = context.WithCancel(context.Background()) + + if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 { + if err := p.startConsul(ctx); err != nil { + return err + } + } if p.MonitorPods { - var ctx context.Context - ctx, p.cancel = context.WithCancel(context.Background()) - return p.start(ctx) + if err := p.startK8s(ctx); err != nil { + return err + } } return nil } func (p *Prometheus) Stop() { - if p.MonitorPods { - p.cancel() - } + p.cancel() p.wg.Wait() } @@ -485,6 +515,7 @@ func init() { return &Prometheus{ ResponseTimeout: config.Duration(time.Second * 3), kubernetesPods: map[string]URLAndAddress{}, + consulServices: map[string]URLAndAddress{}, URLTag: "url", } })