Skip to content

Commit

Permalink
Fix libbeat.output.*.bytes metrics of Elasticsearch output (elastic#2…
Browse files Browse the repository at this point in the history
…1197)

## What does this PR do?

This PR passes a missing parameter to the constructor `elasticsearch.Client`.

## Why is it important?

The missing parameter prevented Beats from reporting the metrics `libbeat.output.write.bytes` and `libbeat.output.read.bytes` in case of Elasticsearch output.

## Related issues

Closes elastic#20752
  • Loading branch information
kvch authored Sep 22, 2020
1 parent cc62e2e commit ab955b8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add missing country_name geo field in `add_host_metadata` and `add_observer_metadata` processors. {issue}20796[20796] {pull}20811[20811]
- [Autodiscover] Handle input-not-finished errors in config reload. {pull}20915[20915]
- Explicitly detect missing variables in autodiscover configuration, log them at the debug level. {issue}20568[20568] {pull}20898[20898]
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewClient(
Kerberos: s.Kerberos,
Proxy: s.Proxy,
ProxyDisable: s.ProxyDisable,
Observer: s.Observer,
Parameters: s.Parameters,
CompressionLevel: s.CompressionLevel,
EscapeHTML: s.EscapeHTML,
Expand Down
31 changes: 24 additions & 7 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/esleg/eslegtest"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
)
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestClientPublishEventKerberosAware(t *testing.T) {
}

func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) {
output, client := connectTestEs(t, cfg)
output, client := connectTestEsWithStats(t, cfg, index)

// drop old index preparing test
client.conn.Delete(index, "", "", nil)
Expand Down Expand Up @@ -107,6 +108,12 @@ func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) {
}

assert.Equal(t, 1, resp.Count)

outputSnapshot := monitoring.CollectFlatSnapshot(monitoring.Default.GetRegistry("output-"+index), monitoring.Full, true)
assert.Greater(t, outputSnapshot.Ints["write.bytes"], int64(0), "output.events.write.bytes must be greater than 0")
assert.Greater(t, outputSnapshot.Ints["read.bytes"], int64(0), "output.events.read.bytes must be greater than 0")
assert.Equal(t, int64(0), outputSnapshot.Ints["write.errors"])
assert.Equal(t, int64(0), outputSnapshot.Ints["read.errors"])
}

func TestClientPublishEventWithPipeline(t *testing.T) {
Expand All @@ -117,7 +124,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) {
index := "beat-int-pub-single-with-pipeline"
pipeline := "beat-int-pub-single-pipeline"

output, client := connectTestEs(t, obj{
output, client := connectTestEsWithoutStats(t, obj{
"index": index,
"pipeline": "%{[pipeline]}",
})
Expand Down Expand Up @@ -199,7 +206,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) {
index := "beat-int-pub-bulk-with-pipeline"
pipeline := "beat-int-pub-bulk-pipeline"

output, client := connectTestEs(t, obj{
output, client := connectTestEsWithoutStats(t, obj{
"index": index,
"pipeline": "%{[pipeline]}",
})
Expand Down Expand Up @@ -276,7 +283,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) {

func TestClientPublishTracer(t *testing.T) {
index := "beat-apm-tracer-test"
output, client := connectTestEs(t, map[string]interface{}{
output, client := connectTestEsWithoutStats(t, map[string]interface{}{
"index": index,
})

Expand Down Expand Up @@ -314,7 +321,17 @@ func TestClientPublishTracer(t *testing.T) {
assert.Equal(t, "/_bulk", secondSpan.Context.HTTP.URL.Path)
}

func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) {
func connectTestEsWithStats(t *testing.T, cfg interface{}, suffix string) (outputs.Client, *Client) {
m := monitoring.Default.NewRegistry("output-" + suffix)
s := outputs.NewStats(m)
return connectTestEs(t, cfg, s)
}

func connectTestEsWithoutStats(t *testing.T, cfg interface{}) (outputs.Client, *Client) {
return connectTestEs(t, cfg, outputs.NewNilObserver())
}

func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outputs.Client, *Client) {
config, err := common.NewConfigFrom(map[string]interface{}{
"hosts": eslegtest.GetEsHost(),
"username": eslegtest.GetUser(),
Expand All @@ -337,7 +354,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) {

info := beat.Info{Beat: "libbeat"}
im, _ := idxmgmt.DefaultSupport(nil, info, nil)
output, err := makeES(im, info, outputs.NewNilObserver(), config)
output, err := makeES(im, info, stats, config)
if err != nil {
t.Fatal(err)
}
Expand All @@ -356,7 +373,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) {

// setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC
func setupRoleMapping(t *testing.T, host string) error {
_, client := connectTestEs(t, map[string]interface{}{
_, client := connectTestEsWithoutStats(t, map[string]interface{}{
"hosts": host,
"username": "elastic",
"password": "changeme",
Expand Down

0 comments on commit ab955b8

Please sign in to comment.