diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5b0dc8a573b..957a5d922df 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix index names for indexing not always guaranteed to be lower case. {pull}16081[16081] - Upgrade go-ucfg to latest v0.8.1. {pull}15937{15937} - Fix loading processors from annotation hints. {pull}16348[16348] +- Fix an issue that could cause redundant configuration reloads. {pull}16440[16440] - Fix k8s pods labels broken schema. {pull}16480[16480] - Fix k8s pods annotations broken schema. {pull}16554[16554] - Upgrade go-ucfg to latest v0.8.3. {pull}16450{16450} @@ -182,6 +183,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add document_id setting to decode_json_fields processor. {pull}15859[15859] - Include network information by default on add_host_metadata and add_observer_metadata. {issue}15347[15347] {pull}16077[16077] - Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823] +- Add monitoring variable `libbeat.config.scans` to distinguish scans of the configuration directory from actual reloads of its contents. {pull}16440[16440] - Add support for multiple password in redis output. {issue}16058[16058] {pull}16206[16206] - Remove experimental flag from `setup.template.append_fields` {pull}16576[16576] - Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621] diff --git a/heartbeat/tests/system/test_reload.py b/heartbeat/tests/system/test_reload.py index 59a0630f193..8d23c85cecd 100644 --- a/heartbeat/tests/system/test_reload.py +++ b/heartbeat/tests/system/test_reload.py @@ -69,6 +69,8 @@ def test_config_add(self): """ self.setup_dynamic() + # Wait until the beat is running and has performed its first load of + # the config directory. self.wait_until(lambda: self.log_contains( "Starting reload procedure, current runners: 0")) @@ -77,8 +79,10 @@ def test_config_add(self): self.write_dyn_config( "test.yml", self.http_cfg("myid", "http://localhost:{}".format(server.server_port))) + # The beat should recognize there is a new runner to start. self.wait_until(lambda: self.log_contains( - "Starting reload procedure, current runners: 1")) + "Start list: 1, Stop list: 0"), + max_timeout=10) self.wait_until(lambda: self.output_lines() > 0) diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 55ee0312bf3..dee3be817b5 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -45,6 +45,10 @@ var ( debugf = logp.MakeDebug("cfgfile") + // configScans measures how many times the config dir was scanned for + // changes, configReloads measures how many times there were changes that + // triggered an actual reload. + configScans = monitoring.NewInt(nil, "libbeat.config.scans") configReloads = monitoring.NewInt(nil, "libbeat.config.reloads") moduleStarts = monitoring.NewInt(nil, "libbeat.config.module.starts") moduleStops = monitoring.NewInt(nil, "libbeat.config.module.stops") @@ -185,7 +189,11 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { rl.config.Reload.Period = 0 } - overwriteUpdate := true + // If forceReload is set, the configuration should be reloaded + // even if there are no changes. It is set on the first iteration, + // and whenever an attempted reload fails. It is unset whenever + // a reload succeeds. + forceReload := true for { select { @@ -195,7 +203,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { case <-time.After(rl.config.Reload.Period): debugf("Scan for new config files") - configReloads.Add(1) + configScans.Add(1) files, updated, err := gw.Scan() if err != nil { @@ -204,21 +212,22 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { logp.Err("Error fetching new config files: %v", err) } - // no file changes - if !updated && !overwriteUpdate { - overwriteUpdate = false + // if there are no changes, skip this reload unless forceReload is set. + if !updated && !forceReload { continue } + configReloads.Add(1) // Load all config objects configs, _ := rl.loadConfigs(files) debugf("Number of module configs found: %v", len(configs)) - if err := list.Reload(configs); err != nil { - // Make sure the next run also updates because some runners were not properly loaded - overwriteUpdate = true - } + err = list.Reload(configs) + // Force reload on the next iteration if and only if this one failed. + // (Any errors are already logged by list.Reload, so we don't need to + // propagate the details further.) + forceReload = err != nil } // Path loading is enabled but not reloading. Loads files only once and then stops. diff --git a/libbeat/cfgfile/reload_test.go b/libbeat/cfgfile/reload_test.go new file mode 100644 index 00000000000..d6575acdaee --- /dev/null +++ b/libbeat/cfgfile/reload_test.go @@ -0,0 +1,107 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package cfgfile + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestReloader(t *testing.T) { + // Create random temp directory + dir, err := ioutil.TempDir("", "libbeat-reloader") + defer os.RemoveAll(dir) + if err != nil { + t.Fatal(err) + } + glob := dir + "/*.yml" + + config := common.MustNewConfigFrom(common.MapStr{ + "path": glob, + "reload": common.MapStr{ + "period": "1s", + "enabled": true, + }, + }) + // common.Config{} + reloader := NewReloader(nil, config) + retryCount := 10 + + go reloader.Run(nil) + defer reloader.Stop() + + // wait until configScans >= 2 (which should happen after ~1 second) + for i := 0; i < retryCount; i++ { + if configScans.Get() >= 2 { + break + } + // time interval is slightly more than a second so we don't slightly + // undershoot the first iteration and wait a whole extra second. + time.Sleep(1100 * time.Millisecond) + } + if configScans.Get() < 2 { + assert.Fail(t, "Timed out waiting for configScans >= 2") + } + + // The first scan should cause a reload, but additional ones should not, + // so configReloads should still be 1. + assert.Equal(t, int64(1), configReloads.Get()) + + // Write a file to the reloader path to trigger a real reload + content := []byte("test\n") + err = ioutil.WriteFile(dir+"/config1.yml", content, 0644) + assert.NoError(t, err) + + // Wait for the number of scans to increase at least twice. This is somewhat + // pedantic, but if we just wait for the next scan, it's possible to wake up + // during the brief interval after configScans is updated but before + // configReloads is, giving a false negative. Waiting two iterations + // guarantees that the change from the first one has taken effect. + targetScans := configScans.Get() + 2 + for i := 0; i < retryCount; i++ { + time.Sleep(time.Second) + if configScans.Get() >= targetScans { + break + } + } + if configScans.Get() < targetScans { + assert.Fail(t, + fmt.Sprintf("Timed out waiting for configScans >= %d", targetScans)) + } + + // The number of reloads should now have increased. It would be nicer to + // check if the value is exactly 2, but we can't guarantee this: the glob + // watcher includes an extra 1-second margin around the real modification + // time, so changes that fall too close to a scan interval can be detected + // twice. + if configReloads.Get() < 2 { + assert.Fail(t, + fmt.Sprintf( + "Reloader performed %d scans but only reloaded once", + configScans.Get())) + } +} diff --git a/libbeat/docs/http-endpoint.asciidoc b/libbeat/docs/http-endpoint.asciidoc index 1264ce2d56e..00fd30db8c6 100644 --- a/libbeat/docs/http-endpoint.asciidoc +++ b/libbeat/docs/http-endpoint.asciidoc @@ -118,6 +118,7 @@ curl -XGET 'localhost:5066/stats?pretty' "starts": 0, "stops": 0 }, + "scans": 1, "reloads": 1 }, "output": { diff --git a/libbeat/tests/system/test_http.py b/libbeat/tests/system/test_http.py index 0311159835a..819e2c9a1ea 100644 --- a/libbeat/tests/system/test_http.py +++ b/libbeat/tests/system/test_http.py @@ -42,7 +42,7 @@ def test_stats(self): data = json.loads(r.content) # Test one data point - assert data["libbeat"]["config"]["reloads"] == 0 + assert data["libbeat"]["config"]["scans"] == 0 proc.check_kill_and_wait() diff --git a/metricbeat/module/beat/stats/_meta/test/stats.800.json b/metricbeat/module/beat/stats/_meta/test/stats.800.json index ced95cd76e3..12b527060fc 100644 --- a/metricbeat/module/beat/stats/_meta/test/stats.800.json +++ b/metricbeat/module/beat/stats/_meta/test/stats.800.json @@ -44,6 +44,7 @@ "starts": 0, "stops": 0 }, + "scans": 1, "reloads": 1 }, "output": { diff --git a/metricbeat/tests/system/test_reload.py b/metricbeat/tests/system/test_reload.py index 02d29d175be..29d82bbf82b 100644 --- a/metricbeat/tests/system/test_reload.py +++ b/metricbeat/tests/system/test_reload.py @@ -44,8 +44,6 @@ def test_reload(self): @unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os") def test_start_stop(self): - def reload_line( - num_runners): return "Starting reload procedure, current runners: %d" % num_runners """ Test if module is properly started and stopped """ @@ -61,7 +59,7 @@ def reload_line( # Ensure no modules are loaded self.wait_until( - lambda: self.log_contains(reload_line(0)), + lambda: self.log_contains("Start list: 0, Stop list: 0"), max_timeout=10) systemConfig = """ @@ -73,17 +71,17 @@ def reload_line( with open(config_path, 'w') as f: f.write(systemConfig) - # Ensure the module was successfully loaded + # Ensure the module is started self.wait_until( - lambda: self.log_contains(reload_line(1)), + lambda: self.log_contains("Start list: 1, Stop list: 0"), max_timeout=10) # Remove config again os.remove(config_path) - # Ensure the module was successfully unloaded + # Ensure the module is stopped self.wait_until( - lambda: self.log_contains(reload_line(0)), + lambda: self.log_contains("Start list: 0, Stop list: 1"), max_timeout=10) time.sleep(1)