Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to set just monitoring.cluster_uuid #14338

Merged
merged 10 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Support usage of custom builders without hints and mappers {pull}13839[13839]
- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259]
- Fix kubernetes `metaGenerator.ResourceMetadata` when parent reference controller is nil {issue}14320[14320] {pull}14329[14329]
- Allow users to configure only `cluster_uuid` setting under `monitoring` namespace. {pull}14338[14338]

*Auditbeat*

Expand Down
59 changes: 38 additions & 21 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,30 +411,12 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
return err
}

monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig)
r, err := b.setupMonitoring(settings)
if err != nil {
return err
}

if monitoringCfg.Enabled() {
settings := report.Settings{
DefaultUsername: settings.Monitoring.DefaultUsername,
Format: reporterSettings.Format,
ClusterUUID: reporterSettings.ClusterUUID,
}
reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output)
if err != nil {
return err
}
defer reporter.Stop()

// Expose monitoring.cluster_uuid in state API
if reporterSettings.ClusterUUID != "" {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
monitoringRegistry := stateRegistry.NewRegistry("monitoring")
clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid")
clusterUUIDRegVar.Set(reporterSettings.ClusterUUID)
}
if r != nil {
defer r.Stop()
}

if b.Config.MetricLogging == nil || b.Config.MetricLogging.Enabled() {
Expand Down Expand Up @@ -896,6 +878,41 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err
return callback, nil
}

func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) {
monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig)
if err != nil {
return nil, err
}

monitoringClusterUUID, err := monitoring.GetClusterUUID(monitoringCfg)
if err != nil {
return nil, err
}

// Expose monitoring.cluster_uuid in state API
if monitoringClusterUUID != "" {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
monitoringRegistry := stateRegistry.NewRegistry("monitoring")
clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid")
clusterUUIDRegVar.Set(monitoringClusterUUID)
}

if monitoring.IsEnabled(monitoringCfg) {
settings := report.Settings{
DefaultUsername: settings.Monitoring.DefaultUsername,
Format: reporterSettings.Format,
ClusterUUID: monitoringClusterUUID,
}
reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output)
if err != nil {
return nil, err
}
return reporter, nil
}

return nil, nil
}

// handleError handles the given error by logging it and then returning the
// error. If the err is nil or is a GracefulExit error then the method will
// return nil without logging anything.
Expand Down
40 changes: 33 additions & 7 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,40 @@ func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error)
return monitoringCfg, &report.Settings{Format: report.FormatXPackMonitoringBulk}, nil
case beatCfg.Monitoring.Enabled():
monitoringCfg := beatCfg.Monitoring
var info struct {
ClusterUUID string `config:"cluster_uuid"`
}
if err := monitoringCfg.Unpack(&info); err != nil {
return nil, nil, err
}
return monitoringCfg, &report.Settings{Format: report.FormatBulk, ClusterUUID: info.ClusterUUID}, nil
return monitoringCfg, &report.Settings{Format: report.FormatBulk}, nil
default:
return nil, nil, nil
}
}

// GetClusterUUID returns the value of the monitoring.cluster_uuid setting, if it is set.
func GetClusterUUID(monitoringCfg *common.Config) (string, error) {
if monitoringCfg == nil {
return "", nil
}

var config struct {
ClusterUUID string `config:"cluster_uuid"`
}
if err := monitoringCfg.Unpack(&config); err != nil {
return "", err
}

return config.ClusterUUID, nil
}

// IsEnabled returns whether the monitoring reporter is enabled or not.
func IsEnabled(monitoringCfg *common.Config) bool {
ph marked this conversation as resolved.
Show resolved Hide resolved
if monitoringCfg == nil {
return false
}

// If the only setting in the monitoring config is cluster_uuid, it is
// not enabled
fields := monitoringCfg.GetFields()
if len(fields) == 1 && fields[0] == "cluster_uuid" {
return false
}

return monitoringCfg.Enabled()
}
16 changes: 13 additions & 3 deletions libbeat/tests/system/config/mockbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,20 @@ xpack.monitoring.elasticsearch.state.period: 3s # to speed up tests

{% if monitoring -%}
#================================ X-Pack Monitoring (direct) =====================================
monitoring.elasticsearch.hosts: {{monitoring.elasticsearch.hosts}}
monitoring.elasticsearch.metrics.period: 2s # to speed up tests
monitoring.elasticsearch.state.period: 3s # to speed up tests
monitoring:
{% if monitoring.elasticsearch -%}
elasticsearch.hosts: {{monitoring.elasticsearch.hosts}}
elasticsearch.metrics.period: 2s # to speed up tests
elasticsearch.state.period: 3s # to speed up tests
{% endif -%}

{% if monitoring.cluster_uuid -%}
cluster_uuid: {{monitoring.cluster_uuid}}
{% endif -%}
{% endif -%}

# vim: set ft=jinja:

{% if http_enabled -%}
http.enabled: {{http_enabled}}
{% endif -%}
34 changes: 34 additions & 0 deletions libbeat/tests/system/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import re
from nose.plugins.attrib import attr
import unittest
import requests
import random
import string

INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)

Expand Down Expand Up @@ -150,6 +153,29 @@ def test_compare(self):
self.assert_same_structure(indirect_beats_state_doc['beats_state'], direct_beats_state_doc['beats_state'])
self.assert_same_structure(indirect_beats_stats_doc['beats_stats'], direct_beats_stats_doc['beats_stats'])

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@attr('integration')
def test_cluster_uuid_setting(self):
"""
Test that monitoring.cluster_uuid setting may be set without any other monitoring.* settings
"""
test_cluster_uuid = self.random_string(10)
self.render_config_template(
"mockbeat",
monitoring={
"cluster_uuid": test_cluster_uuid
},
http_enabled="true"
)

proc = self.start_beat(config="mockbeat.yml")
self.wait_until(lambda: self.log_contains("mockbeat start running."))

state = self.get_beat_state()
proc.check_kill_and_wait()

self.assertEqual(test_cluster_uuid, state["monitoring"]["cluster_uuid"])

def search_monitoring_doc(self, monitoring_type):
results = self.es_monitoring.search(
index='.monitoring-beats-*',
Expand Down Expand Up @@ -241,3 +267,11 @@ def get_elasticsearch_monitoring_url(self):
host=os.getenv("ES_MONITORING_HOST", "localhost"),
port=os.getenv("ES_MONITORING_PORT", "9210")
)

def get_beat_state(self):
url = "http://localhost:5066/state"
return requests.get(url).json()

def random_string(self, size):
char_pool = string.ascii_letters + string.digits
return ''.join(random.choice(char_pool) for i in range(size))