diff --git a/metricbeat/module/elasticsearch/shard/data.go b/metricbeat/module/elasticsearch/shard/data.go index b7bc6903075..e2c211035fb 100644 --- a/metricbeat/module/elasticsearch/shard/data.go +++ b/metricbeat/module/elasticsearch/shard/data.go @@ -24,8 +24,9 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper/elastic" "github.com/elastic/elastic-agent-libs/mapstr" + "fmt" + "github.com/joeshaw/multierror" - "github.com/pkg/errors" s "github.com/elastic/beats/v7/libbeat/common/schema" c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" @@ -61,45 +62,45 @@ func eventsMapping(r mb.ReporterV2, content []byte, isXpack bool) error { stateData := &stateStruct{} err := json.Unmarshal(content, stateData) if err != nil { - return errors.Wrap(err, "failure parsing Elasticsearch Cluster State API response") + return fmt.Errorf("failure parsing Elasticsearch Cluster State API response: %w", err) } var errs multierror.Errors for _, index := range stateData.RoutingTable.Indices { for _, shards := range index.Shards { - for _, shard := range shards { + for i, shard := range shards { event := mb.Event{ ModuleFields: mapstr.M{}, } - event.ModuleFields.Put("cluster.state.id", stateData.StateID) - event.ModuleFields.Put("cluster.stats.state.state_uuid", stateData.StateID) - event.ModuleFields.Put("cluster.id", stateData.ClusterID) - event.ModuleFields.Put("cluster.name", stateData.ClusterName) + _, _ = event.ModuleFields.Put("cluster.state.id", stateData.StateID) + _, _ = event.ModuleFields.Put("cluster.stats.state.state_uuid", stateData.StateID) + _, _ = event.ModuleFields.Put("cluster.id", stateData.ClusterID) + _, _ = event.ModuleFields.Put("cluster.name", stateData.ClusterName) fields, err := schema.Apply(shard) if err != nil { - errs = append(errs, errors.Wrap(err, "failure applying shard schema")) + errs = append(errs, fmt.Errorf("failure applying shard schema: %w", err)) continue } // Handle node field: could be string or null err = elasticsearch.PassThruField("node", shard, fields) if err != nil { - errs = append(errs, errors.Wrap(err, "failure passing through node field")) + errs = append(errs, fmt.Errorf("failure passing through node field: %w", err)) continue } // Handle relocating_node field: could be string or null err = elasticsearch.PassThruField("relocating_node", shard, fields) if err != nil { - errs = append(errs, errors.Wrap(err, "failure passing through relocating_node field")) + errs = append(errs, fmt.Errorf("failure passing through relocating_node field: %w", err)) continue } - event.ID, err = generateHashForEvent(stateData.StateID, fields) + event.ID, err = generateHashForEvent(stateData.StateID, fields, i) if err != nil { - errs = append(errs, errors.Wrap(err, "failure getting event ID")) + errs = append(errs, fmt.Errorf("failure getting event ID: %w", err)) continue } @@ -110,28 +111,28 @@ func eventsMapping(r mb.ReporterV2, content []byte, isXpack bool) error { continue } if nodeID != nil { // shard has not been allocated yet - event.ModuleFields.Put("node.id", nodeID) + _, _ = event.ModuleFields.Put("node.id", nodeID) delete(fields, "node") sourceNode, err := getSourceNode(nodeID.(string), stateData) if err != nil { - errs = append(errs, errors.Wrap(err, "failure getting source node information")) + errs = append(errs, fmt.Errorf("failure getting source node information: %w", err)) continue } - event.ModuleFields.Put("node.name", sourceNode["name"]) - event.MetricSetFields.Put("source_node", sourceNode) + _, _ = event.ModuleFields.Put("node.name", sourceNode["name"]) + _, _ = event.MetricSetFields.Put("source_node", sourceNode) } - event.ModuleFields.Put("index.name", fields["index"]) + _, _ = event.ModuleFields.Put("index.name", fields["index"]) delete(fields, "index") - event.MetricSetFields.Put("number", fields["shard"]) + _, _ = event.MetricSetFields.Put("number", fields["shard"]) delete(event.MetricSetFields, "shard") delete(event.MetricSetFields, "relocating_node") relocatingNode := fields["relocating_node"] - event.MetricSetFields.Put("relocating_node.name", relocatingNode) - event.MetricSetFields.Put("relocating_node.id", relocatingNode) + _, _ = event.MetricSetFields.Put("relocating_node.name", relocatingNode) + _, _ = event.MetricSetFields.Put("relocating_node.id", relocatingNode) // xpack.enabled in config using standalone metricbeat writes to `.monitoring` instead of `metricbeat-*` // When using Agent, the index name is overwritten anyways. @@ -160,7 +161,10 @@ func getSourceNode(nodeID string, stateData *stateStruct) (mapstr.M, error) { }, nil } -func generateHashForEvent(stateID string, shard mapstr.M) (string, error) { +// Note: This function may generate duplicate IDs, but those will be dropped since libbeat +// ignores the 409 status code +// https://github.com/elastic/beats/blob/main/libbeat/outputs/elasticsearch/client.go#L396 +func generateHashForEvent(stateID string, shard mapstr.M, index int) (string, error) { var nodeID string if shard["node"] == nil { nodeID = "_na" @@ -181,7 +185,7 @@ func generateHashForEvent(stateID string, shard mapstr.M) (string, error) { if !ok { return "", elastic.MakeErrorForMissingField("shard", elastic.Elasticsearch) } - shardNumberStr := strconv.FormatInt(shardNumberInt, 10) + shardNumberStr := "s" + strconv.FormatInt(shardNumberInt, 10) isPrimary, ok := shard["primary"].(bool) if !ok { @@ -191,7 +195,7 @@ func generateHashForEvent(stateID string, shard mapstr.M) (string, error) { if isPrimary { shardType = "p" } else { - shardType = "r" + shardType = "r" + strconv.Itoa(index) } return stateID + ":" + nodeID + ":" + indexName + ":" + shardNumberStr + ":" + shardType, nil