From f5c123a442dd411244478de4954655643979f790 Mon Sep 17 00:00:00 2001 From: mannukalra Date: Wed, 5 Oct 2022 22:37:05 +0530 Subject: [PATCH] feat(outputs.opensearch): opensearch output plugin --- plugins/outputs/all/opensearch.go | 5 + plugins/outputs/opensearch/README.md | 364 +++++++++ plugins/outputs/opensearch/opensearch.go | 458 +++++++++++ plugins/outputs/opensearch/opensearch_test.go | 773 ++++++++++++++++++ plugins/outputs/opensearch/sample.conf | 79 ++ 5 files changed, 1679 insertions(+) create mode 100644 plugins/outputs/all/opensearch.go create mode 100644 plugins/outputs/opensearch/README.md create mode 100644 plugins/outputs/opensearch/opensearch.go create mode 100644 plugins/outputs/opensearch/opensearch_test.go create mode 100644 plugins/outputs/opensearch/sample.conf diff --git a/plugins/outputs/all/opensearch.go b/plugins/outputs/all/opensearch.go new file mode 100644 index 0000000000000..3a0f964e5efa1 --- /dev/null +++ b/plugins/outputs/all/opensearch.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.opensearch + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/opensearch" // register plugin diff --git a/plugins/outputs/opensearch/README.md b/plugins/outputs/opensearch/README.md new file mode 100644 index 0000000000000..9364efc037bf1 --- /dev/null +++ b/plugins/outputs/opensearch/README.md @@ -0,0 +1,364 @@ +# Opensearch Output Plugin + +This plugin writes to [Opensearch](https://opensearch.org/) via HTTP using +Elastic client API ( + +It supports Opensearch releases from 1.x up to 2.x. + +## Opensearch indexes and templates + +### Indexes per time-frame + +This plugin can manage indexes per time-frame, as commonly done in other tools +with Opensearch. + +The timestamp of the metric collected will be used to decide the index +destination. + +For more information about this usage on Opensearch, check [the +docs][1]. + +[1]: https://opensearch.org/docs/latest/ + +### Template management + +Index templates are used in Opensearch to define settings and mappings for +the indexes and how the fields should be analyzed. For more information on how +this works, see [the docs][2]. + +This plugin can create a working template for use with telegraf metrics. It uses +Opensearch dynamic templates feature to set proper types for the tags and +metrics fields. If the template specified already exists, it will not overwrite +unless you configure this plugin to do so. Thus you can customize this template +after its creation if necessary. + +Example of an index template created by telegraf on Opensearch 2.x: + +```json +{ + "telegraf-2022.10.02" : { + "aliases" : { }, + "mappings" : { + "properties" : { + "@timestamp" : { + "type" : "date" + }, + "disk" : { + "properties" : { + "free" : { + "type" : "long" + }, + "inodes_free" : { + "type" : "long" + }, + "inodes_total" : { + "type" : "long" + }, + "inodes_used" : { + "type" : "long" + }, + "total" : { + "type" : "long" + }, + "used" : { + "type" : "long" + }, + "used_percent" : { + "type" : "float" + } + } + }, + "measurement_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "tag" : { + "properties" : { + "cpu" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "device" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "host" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "mode" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "path" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + } + } + } + } + }, + "settings" : { + "index" : { + "creation_date" : "1664693522789", + "number_of_shards" : "1", + "number_of_replicas" : "1", + "uuid" : "TYugdmvsQfmxjzbGRJ8FIw", + "version" : { + "created" : "136247827" + }, + "provided_name" : "telegraf-2022.10.02" + } + } + } +} + +``` + +[2]: https://opensearch.org/docs/latest/opensearch/index-templates/ + +### Example events + +This plugin will format the events in the following way: + +```json +{ + "@timestamp": "2017-01-01T00:00:00+00:00", + "measurement_name": "cpu", + "cpu": { + "usage_guest": 0, + "usage_guest_nice": 0, + "usage_idle": 71.85413456197966, + "usage_iowait": 0.256805341656516, + "usage_irq": 0, + "usage_nice": 0, + "usage_softirq": 0.2054442732579466, + "usage_steal": 0, + "usage_system": 15.04879301548127, + "usage_user": 12.634822807288275 + }, + "tag": { + "cpu": "cpu-total", + "host": "opensearhhost", + "dc": "datacenter1" + } +} +``` + +```json +{ + "@timestamp": "2017-01-01T00:00:00+00:00", + "measurement_name": "system", + "system": { + "load1": 0.78, + "load15": 0.8, + "load5": 0.8, + "n_cpus": 2, + "n_users": 2 + }, + "tag": { + "host": "opensearhhost", + "dc": "datacenter1" + } +} +``` + +## Configuration + +```toml @sample.conf +# Configuration for Opensearch to send metrics to. +[[outputs.opensearch]] + ## The full HTTP endpoint URL for your Opensearch instance + ## Multiple urls can be specified as part of the same cluster, + ## this means that only ONE of the urls will be written to each interval + urls = [ "http://node1.es.example.com:9200" ] # required. + ## Opensearch client timeout, defaults to "5s" if not set. + timeout = "5s" + ## Set to true to ask Opensearch a list of all cluster nodes, + ## thus it is not necessary to list all nodes in the urls config option + enable_sniffer = false + ## Set to true to enable gzip compression + enable_gzip = false + ## Set the interval to check if the Opensearch nodes are available + ## Setting to "0s" will disable the health check (not recommended in production) + health_check_interval = "10s" + ## Set the timeout for periodic health checks. + # health_check_timeout = "1s" + ## HTTP basic authentication details. + ## HTTP basic authentication details + # username = "telegraf" + # password = "mypassword" + ## HTTP bearer token authentication details + # auth_bearer_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + + ## Index Config + ## The target index for metrics (Opensearch will create if it not exists). + ## You can use the date specifiers below to create indexes per time frame. + ## The metric timestamp will be used to decide the destination index name + # %Y - year (2016) + # %y - last two digits of year (00..99) + # %m - month (01..12) + # %d - day of month (e.g., 01) + # %H - hour (00..23) + # %V - week of the year (ISO week) (01..53) + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the index name. If the tag does not exist, + ## the default tag value will be used. + # index_name = "telegraf-{{host}}-%Y.%m.%d" + # default_tag_value = "none" + index_name = "telegraf-%Y.%m.%d" # required. + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Template Config + ## Set to true if you want telegraf to manage its index template. + ## If enabled it will create a recommended index template for telegraf indexes + manage_template = true + ## The template name used for telegraf indexes + template_name = "telegraf" + ## Set to true if you want telegraf to overwrite an existing template + overwrite_template = false + ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string + ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's + force_document_id = false + + ## Specifies the handling of NaN and Inf values. + ## This option can have the following values: + ## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered + ## drop -- drop fields containing NaNs or infs + ## replace -- replace with the value in "float_replacement_value" (default: 0.0) + ## NaNs and inf will be replaced with the given number, -inf with the negative of that number + # float_handling = "none" + # float_replacement_value = 0.0 + + ## Pipeline Config + ## To use a ingest pipeline, set this to the name of the pipeline you want to use. + # use_pipeline = "my_pipeline" + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the pipeline name. If the tag does not exist, + ## the default pipeline will be used as the pipeline. If no default pipeline is set, + ## no pipeline is used for the metric. + # use_pipeline = "{{es_pipeline}}" + # default_pipeline = "my_pipeline" +``` + +### Permissions + +If you are using authentication within your Opensearch cluster, you need to +create a account and create a role with at least the manage role in the Cluster +Privileges category. Overwise, your account will not be able to connect to your +Opensearch cluster and send logs to your cluster. After that, you need to +add "create_indice" and "write" permission to your specific index pattern. + +### Required parameters + +* `urls`: A list containing the full HTTP URL of one or more nodes from your + Opensearch instance. +* `index_name`: The target index for metrics. You can use the date specifiers + below to create indexes per time frame. + +``` %Y - year (2017) + %y - last two digits of year (00..99) + %m - month (01..12) + %d - day of month (e.g., 01) + %H - hour (00..23) + %V - week of the year (ISO week) (01..53) +``` + +Additionally, you can specify dynamic index names by using tags with the +notation ```{{tag_name}}```. This will store the metrics with different tag +values in different indices. If the tag does not exist in a particular metric, +the `default_tag_value` will be used instead. + +### Optional parameters + +* `timeout`: Opensearch client timeout, defaults to "5s" if not set. +* `enable_sniffer`: Set to true to ask Opensearch a list of all cluster + nodes, thus it is not necessary to list all nodes in the urls config option. +* `health_check_interval`: Set the interval to check if the nodes are available, + in seconds. Setting to 0 will disable the health check (not recommended in + production). +* `username`: The username for HTTP basic authentication details (eg. when using + Shield). +* `password`: The password for HTTP basic authentication details (eg. when using + Shield). +* `manage_template`: Set to true if you want telegraf to manage its index + template. If enabled it will create a recommended index template for telegraf + indexes. +* `template_name`: The template name used for telegraf indexes. +* `overwrite_template`: Set to true if you want telegraf to overwrite an + existing template. +* `force_document_id`: Set to true will compute a unique hash from as + sha256(concat(timestamp,measurement,series-hash)),enables resend or update + data withoud ES duplicated documents. +* `float_handling`: Specifies how to handle `NaN` and infinite field + values. `"none"` (default) will do nothing, `"drop"` will drop the field and + `replace` will replace the field value by the number in + `float_replacement_value` +* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and + `inf`s if `float_handling` is set to `replace`. Negative `inf` will be + replaced by the negative value in this number to respect the sign of the + field's original value. +* `use_pipeline`: If set, the set value will be used as the pipeline to call + when sending events to opensearch. Additionally, you can specify dynamic + pipeline names by using tags with the notation ```{{tag_name}}```. If the tag + does not exist in a particular metric, the `default_pipeline` will be used + instead. +* `default_pipeline`: If dynamic pipeline names the tag does not exist in a + particular metric, this value will be used instead. + +## Known issues + +Integer values collected that are bigger than 2^63 and smaller than 1e21 (or in +this exact same window of their negative counterparts) are encoded by golang +JSON encoder in decimal format and that is not fully supported by Opensearch +dynamic field mapping. This causes the metrics with such values to be dropped in +case a field mapping has not been created yet on the telegraf index. If that's +the case you will see an exception on Opensearch side like this: + +```json +{"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse"}],"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"illegal_state_exception","reason":"No matching token for number_type [BIG_INTEGER]"}},"status":400} +``` + +The correct field mapping will be created on the telegraf index as soon as a +supported JSON value is received by Opensearch, and subsequent insertions +will work because the field mapping will already exist. + +This issue is caused by the way Opensearch tries to detect integer fields, +and by how golang encodes numbers in JSON. There is no clear workaround for this +at the moment. diff --git a/plugins/outputs/opensearch/opensearch.go b/plugins/outputs/opensearch/opensearch.go new file mode 100644 index 0000000000000..a8a2673af060c --- /dev/null +++ b/plugins/outputs/opensearch/opensearch.go @@ -0,0 +1,458 @@ +//go:generate ../../../tools/readme_config_includer/generator +package opensearch + +import ( + "bytes" + "context" + "crypto/sha256" + _ "embed" + "fmt" + "math" + "net/http" + "net/url" + "strconv" + "strings" + "text/template" + "time" + + "github.com/olivere/elastic" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" +) + +//go:embed sample.conf +var sampleConfig string + +type Opensearch struct { + AuthBearerToken string `toml:"auth_bearer_token"` + DefaultPipeline string `toml:"default_pipeline"` + DefaultTagValue string `toml:"default_tag_value"` + EnableGzip bool `toml:"enable_gzip"` + EnableSniffer bool `toml:"enable_sniffer"` + FloatHandling string `toml:"float_handling"` + FloatReplacement float64 `toml:"float_replacement_value"` + ForceDocumentID bool `toml:"force_document_id"` + HealthCheckInterval config.Duration `toml:"health_check_interval"` + HealthCheckTimeout config.Duration `toml:"health_check_timeout"` + IndexName string `toml:"index_name"` + ManageTemplate bool `toml:"manage_template"` + OverwriteTemplate bool `toml:"overwrite_template"` + Password string `toml:"password"` + TemplateName string `toml:"template_name"` + Timeout config.Duration `toml:"timeout"` + URLs []string `toml:"urls"` + UsePipeline string `toml:"use_pipeline"` + Username string `toml:"username"` + Log telegraf.Logger `toml:"-"` + majorReleaseNumber int + pipelineName string + pipelineTagKeys []string + tagKeys []string + tls.ClientConfig + + Client *elastic.Client +} + +const telegrafTemplate = ` +{ + "index_patterns" : [ "{{.TemplatePattern}}" ], + "settings": { + "index": { + "refresh_interval": "10s", + "mapping.total_fields.limit": 5000, + "auto_expand_replicas" : "0-1", + "codec" : "best_compression" + } + }, + "mappings" : { + "properties" : { + "@timestamp" : { "type" : "date" }, + "measurement_name" : { "type" : "keyword" } + }, + "dynamic_templates": [ + { + "tags": { + "match_mapping_type": "string", + "path_match": "tag.*", + "mapping": { + "ignore_above": 512, + "type": "keyword" + } + } + }, + { + "metrics_long": { + "match_mapping_type": "long", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "metrics_double": { + "match_mapping_type": "double", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "text_fields": { + "match": "*", + "mapping": { + "norms": false + } + } + } + ] + } +}` + +type templatePart struct { + TemplatePattern string + Version int +} + +func (*Opensearch) SampleConfig() string { + return sampleConfig +} + +func (a *Opensearch) Connect() error { + if a.URLs == nil || a.IndexName == "" { + return fmt.Errorf("opensearch urls or index_name is not defined") + } + + // Determine if we should process NaN and inf values + switch a.FloatHandling { + case "", "none": + a.FloatHandling = "none" + case "drop", "replace": + default: + return fmt.Errorf("invalid float_handling type %q", a.FloatHandling) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout)) + defer cancel() + + var clientOptions []elastic.ClientOptionFunc + + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return err + } + tr := &http.Transport{ + TLSClientConfig: tlsCfg, + } + + httpclient := &http.Client{ + Transport: tr, + Timeout: time.Duration(a.Timeout), + } + + osURL, err := url.Parse(a.URLs[0]) + if err != nil { + return fmt.Errorf("parsing URL failed: %v", err) + } + + clientOptions = append(clientOptions, + elastic.SetHttpClient(httpclient), + elastic.SetSniff(a.EnableSniffer), + elastic.SetScheme(osURL.Scheme), + elastic.SetURL(a.URLs...), + elastic.SetHealthcheckInterval(time.Duration(a.HealthCheckInterval)), + elastic.SetHealthcheckTimeout(time.Duration(a.HealthCheckTimeout)), + elastic.SetGzip(a.EnableGzip), + ) + + if a.Username != "" && a.Password != "" { + clientOptions = append(clientOptions, + elastic.SetBasicAuth(a.Username, a.Password), + ) + } + + if a.AuthBearerToken != "" { + clientOptions = append(clientOptions, + elastic.SetHeaders(http.Header{ + "Authorization": []string{fmt.Sprintf("Bearer %s", a.AuthBearerToken)}, + }), + ) + } + + if time.Duration(a.HealthCheckInterval) == 0 { + clientOptions = append(clientOptions, + elastic.SetHealthcheck(false), + ) + a.Log.Debugf("Disabling health check") + } + + client, err := elastic.NewClient(clientOptions...) + + if err != nil { + return err + } + + // check for OS version on first node + osVersion, err := client.ElasticsearchVersion(a.URLs[0]) + + if err != nil { + return fmt.Errorf("opensearch version check failed: %s", err) + } + + // quit if ES version is not supported + majorReleaseNumber, err := strconv.Atoi(strings.Split(osVersion, ".")[0]) + if err != nil || majorReleaseNumber < 1 { + return fmt.Errorf("opensearch version not supported: %s", osVersion) + } + + a.Log.Infof("Opensearch version: %q", osVersion) + + a.Client = client + a.majorReleaseNumber = majorReleaseNumber + + if a.ManageTemplate { + err := a.manageTemplate(ctx) + if err != nil { + return err + } + } + + a.IndexName, a.tagKeys = a.GetTagKeys(a.IndexName) + a.pipelineName, a.pipelineTagKeys = a.GetTagKeys(a.UsePipeline) + + return nil +} + +// GetPointID generates a unique ID for a Metric Point +func GetPointID(m telegraf.Metric) string { + var buffer bytes.Buffer + //Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID + + buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10)) //nolint:revive // from buffer.go: "err is always nil" + buffer.WriteString(m.Name()) //nolint:revive // from buffer.go: "err is always nil" + buffer.WriteString(strconv.FormatUint(m.HashID(), 10)) //nolint:revive // from buffer.go: "err is always nil" + + return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes())) +} + +func (a *Opensearch) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + bulkRequest := a.Client.Bulk() + + for _, metric := range metrics { + var name = metric.Name() + + // index name has to be re-evaluated each time for telegraf + // to send the metric to the correct time-based index + indexName := a.GetIndexName(a.IndexName, metric.Time(), a.tagKeys, metric.Tags()) + + // Handle NaN and inf field-values + fields := make(map[string]interface{}) + for k, value := range metric.Fields() { + v, ok := value.(float64) + if !ok || a.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) { + fields[k] = value + continue + } + if a.FloatHandling == "drop" { + continue + } + + if math.IsNaN(v) || math.IsInf(v, 1) { + fields[k] = a.FloatReplacement + } else { + fields[k] = -a.FloatReplacement + } + } + + m := make(map[string]interface{}) + + m["@timestamp"] = metric.Time() + m["measurement_name"] = name + m["tag"] = metric.Tags() + m[name] = fields + + br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m) + + if a.ForceDocumentID { + id := GetPointID(metric) + br.Id(id) + } + + if a.UsePipeline != "" { + if pipelineName := a.getPipelineName(a.pipelineName, a.pipelineTagKeys, metric.Tags()); pipelineName != "" { + br.Pipeline(pipelineName) + } + } + + bulkRequest.Add(br) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout)) + defer cancel() + + res, err := bulkRequest.Do(ctx) + + if err != nil { + return fmt.Errorf("error sending bulk request to Opensearch: %s", err) + } + + if res.Errors { + for id, err := range res.Failed() { + a.Log.Errorf("Opensearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"]) + break + } + return fmt.Errorf("opensearch failed to index %d metrics", len(res.Failed())) + } + + return nil +} + +func (a *Opensearch) manageTemplate(ctx context.Context) error { + if a.TemplateName == "" { + return fmt.Errorf("opensearch template_name configuration not defined") + } + + templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx) + + if errExists != nil { + return fmt.Errorf("opensearch template check failed, template name: %s, error: %s", a.TemplateName, errExists) + } + + templatePattern := a.IndexName + + if strings.Contains(templatePattern, "%") { + templatePattern = templatePattern[0:strings.Index(templatePattern, "%")] + } + + if strings.Contains(templatePattern, "{{") { + templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")] + } + + if templatePattern == "" { + return fmt.Errorf("template cannot be created for dynamic index names without an index prefix") + } + + if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") { + tp := templatePart{ + TemplatePattern: templatePattern + "*", + Version: a.majorReleaseNumber, + } + + t := template.Must(template.New("template").Parse(telegrafTemplate)) + var tmpl bytes.Buffer + + if err := t.Execute(&tmpl, tp); err != nil { + return err + } + _, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx) + + if errCreateTemplate != nil { + return fmt.Errorf("opensearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate) + } + + a.Log.Debugf("Template %s created or updated\n", a.TemplateName) + } else { + a.Log.Debug("Found existing Opensearch template. Skipping template management") + } + return nil +} + +func (a *Opensearch) GetTagKeys(indexName string) (string, []string) { + tagKeys := []string{} + startTag := strings.Index(indexName, "{{") + + for startTag >= 0 { + endTag := strings.Index(indexName, "}}") + + if endTag < 0 { + startTag = -1 + } else { + tagName := indexName[startTag+2 : endTag] + + var tagReplacer = strings.NewReplacer( + "{{"+tagName+"}}", "%s", + ) + + indexName = tagReplacer.Replace(indexName) + tagKeys = append(tagKeys, strings.TrimSpace(tagName)) + + startTag = strings.Index(indexName, "{{") + } + } + + return indexName, tagKeys +} + +func (a *Opensearch) GetIndexName(indexName string, eventTime time.Time, tagKeys []string, metricTags map[string]string) string { + if strings.Contains(indexName, "%") { + var dateReplacer = strings.NewReplacer( + "%Y", eventTime.UTC().Format("2006"), + "%y", eventTime.UTC().Format("06"), + "%m", eventTime.UTC().Format("01"), + "%d", eventTime.UTC().Format("02"), + "%H", eventTime.UTC().Format("15"), + "%V", getISOWeek(eventTime.UTC()), + ) + + indexName = dateReplacer.Replace(indexName) + } + + tagValues := []interface{}{} + + for _, key := range tagKeys { + if value, ok := metricTags[key]; ok { + tagValues = append(tagValues, value) + } else { + a.Log.Debugf("Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue) + tagValues = append(tagValues, a.DefaultTagValue) + } + } + + return fmt.Sprintf(indexName, tagValues...) +} + +func (a *Opensearch) getPipelineName(pipelineInput string, tagKeys []string, metricTags map[string]string) string { + if !strings.Contains(pipelineInput, "%") || len(tagKeys) == 0 { + return pipelineInput + } + + var tagValues []interface{} + + for _, key := range tagKeys { + if value, ok := metricTags[key]; ok { + tagValues = append(tagValues, value) + continue + } + a.Log.Debugf("Tag %s not found, reverting to default pipeline instead.", key) + return a.DefaultPipeline + } + return fmt.Sprintf(pipelineInput, tagValues...) +} + +func getISOWeek(eventTime time.Time) string { + _, week := eventTime.ISOWeek() + return strconv.Itoa(week) +} + +func (a *Opensearch) Close() error { + a.Client = nil + return nil +} + +func init() { + outputs.Add("opensearch", func() telegraf.Output { + return &Opensearch{ + Timeout: config.Duration(time.Second * 5), + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + } + }) +} diff --git a/plugins/outputs/opensearch/opensearch_test.go b/plugins/outputs/opensearch/opensearch_test.go new file mode 100644 index 0000000000000..3a3d594d7f718 --- /dev/null +++ b/plugins/outputs/opensearch/opensearch_test.go @@ -0,0 +1,773 @@ +package opensearch + +import ( + "context" + "fmt" + "math" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/docker/go-connections/nat" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" +) + +const servicePort = "9200" + +func launchTestContainer(t *testing.T) *testutil.Container { + container := testutil.Container{ + Image: "opensearchproject/opensearch:1.1.0", + ExposedPorts: []string{servicePort}, + Env: map[string]string{ + "discovery.type": "single-node", + "DISABLE_INSTALL_DEMO_CONFIG": "true", + "DISABLE_SECURITY_PLUGIN": "true", + }, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + ), + } + err := container.Start() + require.NoError(t, err, "failed to start container") + + return &container +} + +func TestConnectAndWriteIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + Log: testutil.Logger{}, + } + + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to Opensearch + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestConnectAndWriteMetricWithNaNValueEmptyIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueNoneIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: "none", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueDropIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: "drop", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.NoError(t, err) + } +} + +func TestConnectAndWriteMetricWithNaNValueReplacementIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + tests := []struct { + floatHandle string + floatReplacement float64 + expectError bool + }{ + { + "none", + 0.0, + true, + }, + { + "drop", + 0.0, + false, + }, + { + "replace", + 0.0, + false, + }, + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + for _, test := range tests { + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: test.floatHandle, + FloatReplacement: test.floatReplacement, + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + err := e.Connect() + require.NoError(t, err) + + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + + if test.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + } + } +} + +func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + ctx := context.Background() + + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: true, + TemplateName: "", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + err := e.manageTemplate(ctx) + require.Error(t, err) +} + +func TestTemplateManagementIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + err := e.Connect() + require.NoError(t, err) + + err = e.manageTemplate(ctx) + require.NoError(t, err) +} + +func TestTemplateInvalidIndexPatternIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: "{{host}}-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + err := e.Connect() + require.Error(t, err) +} + +func TestGetTagKeys(t *testing.T) { + e := &Opensearch{ + DefaultTagValue: "none", + Log: testutil.Logger{}, + } + + tests := []struct { + IndexName string + ExpectedIndexName string + ExpectedTagKeys []string + }{ + { + "indexname", + "indexname", + []string{}, + }, { + "indexname-%Y", + "indexname-%Y", + []string{}, + }, { + "indexname-%Y-%m", + "indexname-%Y-%m", + []string{}, + }, { + "indexname-%Y-%m-%d", + "indexname-%Y-%m-%d", + []string{}, + }, { + "indexname-%Y-%m-%d-%H", + "indexname-%Y-%m-%d-%H", + []string{}, + }, { + "indexname-%y-%m", + "indexname-%y-%m", + []string{}, + }, { + "indexname-{{tag1}}-%y-%m", + "indexname-%s-%y-%m", + []string{"tag1"}, + }, { + "indexname-{{tag1}}-{{tag2}}-%y-%m", + "indexname-%s-%s-%y-%m", + []string{"tag1", "tag2"}, + }, { + "indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m", + "indexname-%s-%s-%s-%y-%m", + []string{"tag1", "tag2", "tag3"}, + }, + } + for _, test := range tests { + indexName, tagKeys := e.GetTagKeys(test.IndexName) + if indexName != test.ExpectedIndexName { + t.Errorf("Expected indexname %s, got %s\n", test.ExpectedIndexName, indexName) + } + if !reflect.DeepEqual(tagKeys, test.ExpectedTagKeys) { + t.Errorf("Expected tagKeys %s, got %s\n", test.ExpectedTagKeys, tagKeys) + } + } +} + +func TestGetIndexName(t *testing.T) { + e := &Opensearch{ + DefaultTagValue: "none", + Log: testutil.Logger{}, + } + + tests := []struct { + EventTime time.Time + Tags map[string]string + TagKeys []string + IndexName string + Expected string + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname", + "indexname", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname-%Y", + "indexname-2014", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname-%Y-%m", + "indexname-2014-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname-%Y-%m-%d", + "indexname-2014-12-01", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname-%Y-%m-%d-%H", + "indexname-2014-12-01-23", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname-%y-%m", + "indexname-14-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "indexname-%Y-%V", + "indexname-2014-49", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{"tag1"}, + "indexname-%s-%y-%m", + "indexname-value1-14-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{"tag1", "tag2"}, + "indexname-%s-%s-%y-%m", + "indexname-value1-value2-14-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{"tag1", "tag2", "tag3"}, + "indexname-%s-%s-%s-%y-%m", + "indexname-value1-value2-none-14-12", + }, + } + for _, test := range tests { + indexName := e.GetIndexName(test.IndexName, test.EventTime, test.TagKeys, test.Tags) + if indexName != test.Expected { + t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName) + } + } +} + +func TestGetPipelineName(t *testing.T) { + e := &Opensearch{ + UsePipeline: "{{es-pipeline}}", + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + } + e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline) + + tests := []struct { + EventTime time.Time + Tags map[string]string + PipelineTagKeys []string + Expected string + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "myDefaultPipeline", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "myDefaultPipeline", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"}, + []string{}, + "myOtherPipeline", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "pipeline2", + }, + } + for _, test := range tests { + pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags) + require.Equal(t, test.Expected, pipelineName) + } + + // Setup testing for testing no pipeline set. All the tests in this case should return "". + e = &Opensearch{ + Log: testutil.Logger{}, + } + e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline) + + for _, test := range tests { + pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags) + require.Equal(t, "", pipelineName) + } +} + +func TestPipelineConfigs(t *testing.T) { + tests := []struct { + EventTime time.Time + Tags map[string]string + PipelineTagKeys []string + Expected string + Elastic *Opensearch + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "", + &Opensearch{ + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "", + &Opensearch{ + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"}, + []string{}, + "myDefaultPipeline", + &Opensearch{ + UsePipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "", + &Opensearch{ + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "pipeline2", + &Opensearch{ + UsePipeline: "{{es-pipeline}}", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "value1-pipeline2", + &Opensearch{ + UsePipeline: "{{tag1}}-{{es-pipeline}}", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1"}, + []string{}, + "", + &Opensearch{ + UsePipeline: "{{es-pipeline}}", + Log: testutil.Logger{}, + }, + }, + } + + for _, test := range tests { + e := test.Elastic + e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline) + pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags) + require.Equal(t, test.Expected, pipelineName) + } +} + +func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_bulk": + require.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + require.Equal(t, "gzip", r.Header.Get("Accept-Encoding")) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) + return + default: + _, err := w.Write([]byte(`{"version": {"number": "7.8"}}`)) + require.NoError(t, err) + return + } + })) + defer ts.Close() + + urls := []string{"http://" + ts.Listener.Addr().String()} + + e := &Opensearch{ + URLs: urls, + IndexName: "{{host}}-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: false, + Log: testutil.Logger{}, + } + + err := e.Connect() + require.NoError(t, err) + + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestRequestHeaderWhenGzipIsDisabled(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_bulk": + require.NotEqual(t, "gzip", r.Header.Get("Content-Encoding")) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) + return + default: + _, err := w.Write([]byte(`{"version": {"number": "7.8"}}`)) + require.NoError(t, err) + return + } + })) + defer ts.Close() + + urls := []string{"http://" + ts.Listener.Addr().String()} + + e := &Opensearch{ + URLs: urls, + IndexName: "{{host}}-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: false, + Log: testutil.Logger{}, + } + + err := e.Connect() + require.NoError(t, err) + + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_bulk": + require.Equal(t, "Bearer 0123456789abcdef", r.Header.Get("Authorization")) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) + return + default: + _, err := w.Write([]byte(`{"version": {"number": "7.8"}}`)) + require.NoError(t, err) + return + } + })) + defer ts.Close() + + urls := []string{"http://" + ts.Listener.Addr().String()} + + e := &Opensearch{ + URLs: urls, + IndexName: "{{host}}-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: false, + Log: testutil.Logger{}, + AuthBearerToken: "0123456789abcdef", + } + + err := e.Connect() + require.NoError(t, err) + + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} diff --git a/plugins/outputs/opensearch/sample.conf b/plugins/outputs/opensearch/sample.conf new file mode 100644 index 0000000000000..0e150bf309183 --- /dev/null +++ b/plugins/outputs/opensearch/sample.conf @@ -0,0 +1,79 @@ +# Configuration for Opensearch to send metrics to. +[[outputs.opensearch]] + ## The full HTTP endpoint URL for your Opensearch instance + ## Multiple urls can be specified as part of the same cluster, + ## this means that only ONE of the urls will be written to each interval + urls = [ "http://node1.os.example.com:9200" ] # required. + ## Opensearch client timeout, defaults to "5s" if not set. + timeout = "5s" + ## Set to true to ask Opensearch a list of all cluster nodes, + ## thus it is not necessary to list all nodes in the urls config option + enable_sniffer = false + ## Set to true to enable gzip compression + enable_gzip = false + ## Set the interval to check if the Opensearch nodes are available + ## Setting to "0s" will disable the health check (not recommended in production) + health_check_interval = "10s" + ## Set the timeout for periodic health checks. + # health_check_timeout = "1s" + ## HTTP basic authentication details. + ## HTTP basic authentication details + # username = "telegraf" + # password = "mypassword" + ## HTTP bearer token authentication details + # auth_bearer_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + + ## Index Config + ## The target index for metrics (Opensearch will create if it not exists). + ## You can use the date specifiers below to create indexes per time frame. + ## The metric timestamp will be used to decide the destination index name + # %Y - year (2016) + # %y - last two digits of year (00..99) + # %m - month (01..12) + # %d - day of month (e.g., 01) + # %H - hour (00..23) + # %V - week of the year (ISO week) (01..53) + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the index name. If the tag does not exist, + ## the default tag value will be used. + # index_name = "telegraf-{{host}}-%Y.%m.%d" + # default_tag_value = "none" + index_name = "telegraf-%Y.%m.%d" # required. + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Template Config + ## Set to true if you want telegraf to manage its index template. + ## If enabled it will create a recommended index template for telegraf indexes + manage_template = true + ## The template name used for telegraf indexes + template_name = "telegraf" + ## Set to true if you want telegraf to overwrite an existing template + overwrite_template = false + ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string + ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's + force_document_id = false + + ## Specifies the handling of NaN and Inf values. + ## This option can have the following values: + ## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered + ## drop -- drop fields containing NaNs or infs + ## replace -- replace with the value in "float_replacement_value" (default: 0.0) + ## NaNs and inf will be replaced with the given number, -inf with the negative of that number + # float_handling = "none" + # float_replacement_value = 0.0 + + ## Pipeline Config + ## To use a ingest pipeline, set this to the name of the pipeline you want to use. + # use_pipeline = "my_pipeline" + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the pipeline name. If the tag does not exist, + ## the default pipeline will be used as the pipeline. If no default pipeline is set, + ## no pipeline is used for the metric. + # use_pipeline = "{{es_pipeline}}" + # default_pipeline = "my_pipeline"