Skip to content

Commit

Permalink
[CM] Allow to unenroll a beats (elastic#9729)
Browse files Browse the repository at this point in the history
When a Beat is unenrolled for CM it will receive a 404. Usually Beats
will threat any errors returned by CM to be transient and will use a
cached version of the configuration, this commit change the logic if a 404 is returned by CM
we will clean the cache and remove any running configuration.

We will log this event as either the beats did not find any
configuration or was unenrolled from CM.

If the error is transient, the Beat will pickup the change on the next
fetch, if its permanent we will log each fetch.

Fixes: elastic#9452 


Need backport to 6.5, 6.6, 6.x

(cherry picked from commit 5187335)
  • Loading branch information
ph committed Dec 24, 2018
1 parent 818bfac commit 94eb520
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 2 deletions.
63 changes: 63 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,69 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]

- Fix issue preventing diskio metrics collection for idle disks. {issue}9124[9124] {pull}9125[9125]
- Allow beats to blacklist certain part of the configuration while using Central Management. {pull}9099[9099]
*Packetbeat*

- Adjust Packetbeat `http` fields to ECS Beta 2 {pull}9645[9645]
- `http.request.body` moves to `http.request.body.content`
- `http.response.body` moves to `http.response.body.content`

*Winlogbeat*

*Functionbeat*

==== Bugfixes

*Affecting all Beats*

- Enforce validation for the Central Management access token. {issue}9621[9621]
- Allow to unenroll a Beat from the UI. {issue}9452[9452]

*Auditbeat*

*Filebeat*

*Heartbeat*

- Made monitors.d configuration part of the default config. {pull}9004[9004]

*Journalbeat*

*Metricbeat*

*Packetbeat*

*Winlogbeat*

*Functionbeat*

==== Added

*Affecting all Beats*

- Update field definitions for `http` to ECS Beta 2 {pull}9645[9645]
- Release Jolokia autodiscover as GA. {pull}9706[9706]

*Auditbeat*

- Add system module. {pull}9546[9546]

*Filebeat*

- Added module for parsing Google Santa logs. {pull}9540[9540]
- Added netflow input type that supports NetFlow v1, v5, v6, v7, v8, v9 and IPFIX. {issue}9399[9399]
- Add option to modules.yml file to indicate that a module has been moved {pull}9432[9432].

*Heartbeat*

- Fixed rare issue where TLS connections to endpoints with x509 certificates missing either notBefore or notAfter would cause the check to fail with a stacktrace. {pull}9566[9566]


*Journalbeat*

*Metricbeat*

- Add `key` metricset to the Redis module. {issue}9582[9582] {pull}9657[9657] {pull}9746[9746]
- Add `socket_summary` metricset to system defaults, removing experimental tag and supporting Windows {pull}9709[9709]

*Packetbeat*

Expand Down
15 changes: 14 additions & 1 deletion x-pack/libbeat/management/api/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"net/http"
"reflect"

"errors"

"github.com/elastic/beats/libbeat/common/reload"

"github.com/gofrs/uuid"

"github.com/elastic/beats/libbeat/common"
)

var errConfigurationNotFound = errors.New("no configuration found, you need to enroll your Beat")

// ConfigBlock stores a piece of config from central management
type ConfigBlock struct {
Raw map[string]interface{}
Expand Down Expand Up @@ -58,7 +62,11 @@ func (c *Client) Configuration(accessToken string, beatUUID uuid.UUID, configOK
} `json:"configuration_blocks"`
}{}
url := fmt.Sprintf("/api/beats/agent/%s/configuration?validSetting=%t", beatUUID, configOK)
_, err := c.request("GET", url, nil, headers, &resp)
statusCode, err := c.request("GET", url, nil, headers, &resp)
if statusCode == http.StatusNotFound {
return nil, errConfigurationNotFound
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -88,3 +96,8 @@ func ConfigBlocksEqual(a, b ConfigBlocks) bool {

return reflect.DeepEqual(a, b)
}

// IsConfigurationNotFound returns true if the configuration was not found.
func IsConfigurationNotFound(err error) bool {
return err == errConfigurationNotFound
}
21 changes: 21 additions & 0 deletions x-pack/libbeat/management/api/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,24 @@ func TestConfigBlocksEqual(t *testing.T) {
})
}
}

func TestUnEnroll(t *testing.T) {
beatUUID, err := uuid.NewV4()
if err != nil {
t.Fatalf("error while generating Beat UUID: %v", err)
}

server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check correct path is used
assert.Equal(t, "/api/beats/agent/"+beatUUID.String()+"/configuration", r.URL.Path)

// Check enrollment token is correct
assert.Equal(t, "thisismyenrollmenttoken", r.Header.Get("kbn-beats-access-token"))

http.NotFound(w, r)
}))
defer server.Close()

_, err = client.Configuration("thisismyenrollmenttoken", beatUUID, false)
assert.True(t, IsConfigurationNotFound(err))
}
5 changes: 5 additions & 0 deletions x-pack/libbeat/management/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,8 @@ func (c *Cache) Save() error {
// move temporary file into final location
return file.SafeFileRotate(path, tempFile)
}

// HasConfig returns true if configs are cached.
func (c *Cache) HasConfig() bool {
return len(c.Configs) > 0
}
38 changes: 38 additions & 0 deletions x-pack/libbeat/management/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package management

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/x-pack/libbeat/management/api"
)

func TestHasConfig(t *testing.T) {
tests := map[string]struct {
configs api.ConfigBlocks
expected bool
}{
"with config": {
configs: api.ConfigBlocks{
api.ConfigBlocksWithType{Type: "metricbeat "},
},
expected: true,
},
"without config": {
configs: api.ConfigBlocks{},
expected: false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cache := Cache{Configs: test.configs}
assert.Equal(t, test.expected, cache.HasConfig())
})
}
}
11 changes: 10 additions & 1 deletion x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,17 @@ func (cm *ConfigManager) worker() {
func (cm *ConfigManager) fetch() bool {
cm.logger.Debug("Retrieving new configurations from Kibana")
configs, err := cm.client.Configuration(cm.config.AccessToken, cm.beatUUID, cm.cache.ConfigOK)

if api.IsConfigurationNotFound(err) {
if cm.cache.HasConfig() {
cm.logger.Error("Disabling all running configuration because no configurations were found for this Beat, the endpoint returned a 404 or the beat is not enrolled with central management")
cm.cache.Configs = api.ConfigBlocks{}
}
return true
}

if err != nil {
cm.logger.Errorf("error retriving new configurations, will use cached ones: %s", err)
cm.logger.Errorf("error retrieving new configurations, will use cached ones: %s", err)
return false
}

Expand Down
68 changes: 68 additions & 0 deletions x-pack/libbeat/management/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,71 @@ func TestRemoveItems(t *testing.T) {
manager.Stop()
os.Remove(paths.Resolve(paths.Data, "management.yml"))
}

func responseText(s string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, s)
}
}

func TestUnEnroll(t *testing.T) {
registry := reload.NewRegistry()
id, err := uuid.NewV4()
if err != nil {
t.Fatalf("error while generating id: %v", err)
}
accessToken := "footoken"
reloadable := reloadable{
reloaded: make(chan *reload.ConfigWithMeta, 1),
}
registry.MustRegister("test.blocks", &reloadable)

mux := http.NewServeMux()
i := 0
responses := []http.HandlerFunc{ // Initial load
responseText(`{"configuration_blocks":[{"type":"test.blocks","config":{"module":"apache2"}}]}`),
http.NotFound,
}
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
responses[i](w, r)
i++
}))

server := httptest.NewServer(mux)

c, err := api.ConfigFromURL(server.URL)
if err != nil {
t.Fatal(err)
}

config := &Config{
Enabled: true,
Period: 100 * time.Millisecond,
Kibana: c,
AccessToken: accessToken,
}

manager, err := NewConfigManagerWithConfig(config, registry, id)
if err != nil {
t.Fatal(err)
}

manager.Start()

// On first reload we will get apache2 module
config1 := <-reloadable.reloaded
assert.Equal(t, &reload.ConfigWithMeta{
Config: common.MustNewConfigFrom(map[string]interface{}{
"module": "apache2",
}),
}, config1)

// Get a nil config, even if the block is not part of the payload
config2 := <-reloadable.reloaded
var nilConfig *reload.ConfigWithMeta
assert.Equal(t, nilConfig, config2)

// Cleanup
manager.Stop()
os.Remove(paths.Resolve(paths.Data, "management.yml"))
}

0 comments on commit 94eb520

Please sign in to comment.