Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add add_observer_metadata processor #11394

Merged
merged 19 commits into from
Apr 24, 2019
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New processor: `truncate_fields`. {pull}11297[11297]
- Allow a beat to ship monitoring data directly to an Elasticsearch monitoring clsuter. {pull}9260[9260]
- Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN]
- Add `add_observer_metadata` processor. {pull}11394[11394]

*Auditbeat*

Expand All @@ -141,6 +142,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*

- Enable `add_observer_metadata` processor in default config. {pull}11394[11394]

*Journalbeat*

*Metricbeat*
Expand Down
5 changes: 5 additions & 0 deletions heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ output.elasticsearch:
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

#================================ Processors =====================================
processors:
- add_observer_metadata: ~


#================================ Logging =====================================

# Sets log level. The default log level is info.
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/scripts/post_process_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
if m:
section_name = m.group(1)
if section_name == "Processors":
output += line # include section name in output
output += "processors:\n"
output += " - add_observer_metadata: ~\n"
output += "\n\n"
inside_processor_section = True
else:
inside_processor_section = False
Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/add_observer_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/libbeat/processors/communityid"
_ "github.com/elastic/beats/libbeat/processors/dissect"
Expand Down
77 changes: 76 additions & 1 deletion libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ The supported processors are:
* <<add-docker-metadata,`add_docker_metadata`>>
* <<add-fields, `add_fields`>>
* <<add-host-metadata,`add_host_metadata`>>
* <<add-observer-metadata,`add_observer_metadata`>>
* <<add-kubernetes-metadata,`add_kubernetes_metadata`>>
* <<add-labels, `add_labels`>>
* <<add-locale,`add_locale`>>
Expand Down Expand Up @@ -1173,7 +1174,7 @@ It has the following settings:


The `add_host_metadata` processor annotates each event with relevant metadata from the host machine.
The fields added to the event are looking as following:
The fields added to the event look like the following:

[source,json]
-------------------------------------------------------------------------------
Expand Down Expand Up @@ -1205,6 +1206,80 @@ The fields added to the event are looking as following:
}
-------------------------------------------------------------------------------

[[add-observer-metadata]]
=== Add Observer metadata

beta[]

[source,yaml]
-------------------------------------------------------------------------------
processors:
- add_observer_metadata:
netinfo.enabled: false
cache.ttl: 5m
geo:
name: nyc-dc1-rack1
location: 40.7128, -74.0060
continent_name: North America
country_iso_code: US
region_name: New York
region_iso_code: NY
city_name: New York
-------------------------------------------------------------------------------

It has the following settings:

`netinfo.enabled`:: (Optional) Default false. Include IP addresses and MAC addresses as fields observer.ip and observer.mac

`cache.ttl`:: (Optional) The processor uses an internal cache for the observer metadata. This sets the cache expiration time. The default is 5m, negative values disable caching altogether.

`geo.name`:: User definable token to be used for identifying a discrete location. Frequently a datacenter, rack, or similar.

`geo.location`:: Longitude and latitude in comma separated format.

`geo.continent_name`:: Name of the continent.

`geo.country_name`:: Name of the country.

`geo.region_name`:: Name of the region.

`geo.city_name`:: Name of the city.

`geo.country_iso_code`:: ISO country code.

`geo.region_iso_code`:: ISO region code.


The `add_geo_metadata` processor annotates each event with relevant metadata from the observer machine.
The fields added to the event look like the following:

[source,json]
-------------------------------------------------------------------------------
{
"observer" : {
"hostname" : "avce",
"type" : "heartbeat",
"vendor" : "elastic",
"ip" : [
"192.168.1.251",
"fe80::64b2:c3ff:fe5b:b974",
],
"mac" : [
"dc:c1:02:6f:1b:ed",
],
"geo": {
"continent_name": "North America",
"country_iso_code": "US",
"region_name": "New York",
"region_iso_code": "NY",
"city_name": "New York",
"name": "nyc-dc1-rack1",
"location": "40.7128, -74.0060"
}
}
}
-------------------------------------------------------------------------------

[[dissect]]
=== Dissect strings

Expand Down
88 changes: 5 additions & 83 deletions libbeat/processors/add_host_metadata/add_host_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ package add_host_metadata

import (
"fmt"
"net"
"regexp"
"sync"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/metric/system/host"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/util"
"github.com/elastic/go-sysinfo"
)

Expand Down Expand Up @@ -67,40 +65,9 @@ func New(cfg *common.Config) (processors.Processor, error) {
p.loadData()

if config.Geo != nil {
if len(config.Geo.Location) > 0 {
// Regexp matching a number with an optional decimal component
// Valid numbers: '123', '123.23', etc.
latOrLon := `\-?\d+(\.\d+)?`

// Regexp matching a pair of lat lon coordinates.
// e.g. 40.123, -92.929
locRegexp := `^\s*` + // anchor to start of string with optional whitespace
latOrLon + // match the latitude
`\s*\,\s*` + // match the separator. optional surrounding whitespace
latOrLon + // match the longitude
`\s*$` //optional whitespace then end anchor

if m, _ := regexp.MatchString(locRegexp, config.Geo.Location); !m {
return nil, errors.New(fmt.Sprintf("Invalid lat,lon string for add_host_metadata: %s", config.Geo.Location))
}
}

geoFields := common.MapStr{
"name": config.Geo.Name,
"location": config.Geo.Location,
"continent_name": config.Geo.ContinentName,
"country_iso_code": config.Geo.CountryISOCode,
"region_name": config.Geo.RegionName,
"region_iso_code": config.Geo.RegionISOCode,
"city_name": config.Geo.CityName,
}
// Delete any empty values
blankStringMatch := regexp.MustCompile(`^\s*$`)
for k, v := range geoFields {
vStr := v.(string)
if blankStringMatch.MatchString(vStr) {
delete(geoFields, k)
}
geoFields, err := util.GeoConfigToMap(*config.Geo)
if err != nil {
return nil, err
}
p.geoData = common.MapStr{"host": common.MapStr{"geo": geoFields}}
}
Expand Down Expand Up @@ -151,7 +118,7 @@ func (p *addHostMetadata) loadData() error {
data := host.MapHostInfo(h.Info())
if p.config.NetInfoEnabled {
// IP-address and MAC-address
var ipList, hwList, err = p.getNetInfo()
var ipList, hwList, err = util.GetNetInfo()
if err != nil {
logp.Info("Error when getting network information %v", err)
}
Expand All @@ -171,51 +138,6 @@ func (p *addHostMetadata) loadData() error {
return nil
}

func (p *addHostMetadata) getNetInfo() ([]string, []string, error) {
var ipList []string
var hwList []string

// Get all interfaces and loop through them
ifaces, err := net.Interfaces()
if err != nil {
return nil, nil, err
}

// Keep track of all errors
var errs multierror.Errors

for _, i := range ifaces {
// Skip loopback interfaces
if i.Flags&net.FlagLoopback == net.FlagLoopback {
continue
}

hw := i.HardwareAddr.String()
// Skip empty hardware addresses
if hw != "" {
hwList = append(hwList, hw)
}

addrs, err := i.Addrs()
if err != nil {
// If we get an error, keep track of it and continue with the next interface
errs = append(errs, err)
continue
}

for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
ipList = append(ipList, v.IP.String())
case *net.IPAddr:
ipList = append(ipList, v.IP.String())
}
}
}

return ipList, hwList, errs.Err()
}

func (p *addHostMetadata) String() string {
return fmt.Sprintf("%v=[netinfo.enabled=[%v], cache.ttl=[%v]]",
processorName, p.config.NetInfoEnabled, p.config.CacheTTL)
Expand Down
71 changes: 12 additions & 59 deletions libbeat/processors/add_host_metadata/add_host_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -169,77 +168,31 @@ func TestConfigGeoEnabled(t *testing.T) {
newEvent, err := p.Run(event)
assert.NoError(t, err)

for configKey, configValue := range config {
t.Run(fmt.Sprintf("Check of %s", configKey), func(t *testing.T) {
v, err := newEvent.GetValue(fmt.Sprintf("host.%s", configKey))
assert.NoError(t, err)
assert.Equal(t, configValue, v, "Could not find in %s", newEvent)
})
}
eventGeoField, err := newEvent.GetValue("host.geo")
require.NoError(t, err)

assert.Len(t, eventGeoField, len(config))
}

func TestPartialGeo(t *testing.T) {
func TestConfigGeoDisabled(t *testing.T) {
event := &beat.Event{
Fields: common.MapStr{},
Timestamp: time.Now(),
}

config := map[string]interface{}{
"geo.name": "yerevan-am",
"geo.city_name": " ",
}
config := map[string]interface{}{}

testConfig, err := common.NewConfigFrom(config)
assert.NoError(t, err)
require.NoError(t, err)

p, err := New(testConfig)
require.NoError(t, err)

newEvent, err := p.Run(event)
assert.NoError(t, err)

v, err := newEvent.Fields.GetValue("host.geo.name")
assert.NoError(t, err)
assert.Equal(t, "yerevan-am", v)

missing := []string{"continent_name", "country_name", "country_iso_code", "region_name", "region_iso_code", "city_name"}

for _, k := range missing {
path := "host.geo." + k
v, err = newEvent.Fields.GetValue(path)

assert.Equal(t, common.ErrKeyNotFound, err, "din expect to find %v", path)
}
}

func TestGeoLocationValidation(t *testing.T) {
locations := []struct {
str string
valid bool
}{
{"40.177200, 44.503490", true},
{"-40.177200, -44.503490", true},
{"garbage", false},
{"9999999999", false},
}

for _, location := range locations {
t.Run(fmt.Sprintf("Location %s validation should be %t", location.str, location.valid), func(t *testing.T) {

conf, err := common.NewConfigFrom(map[string]interface{}{
"geo": map[string]interface{}{
"location": location.str,
},
})
require.NoError(t, err)

_, err = New(conf)
require.NoError(t, err)

if location.valid {
require.NoError(t, err)
} else {
require.Error(t, err)
}
})
}
eventGeoField, err := newEvent.GetValue("host.geo")
assert.Error(t, err)
assert.Equal(t, nil, eventGeoField)
}
Loading