diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e4d14a165f..d63daae50dc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -45,12 +45,13 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Support for multiline zookeeper logs {issue}2496[2496] - Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792] - Disable lockfile when running under elastic-agent. {pull}33988[33988] +- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346] *Auditbeat* *Filebeat* -- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for +- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for automatic splitting at root level, if root level element is an array. {pull}34155[34155] - [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127] - [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981] diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 70c196e347a..fbca2c28032 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -13,18 +13,19 @@ import ( "syscall" "time" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/gofrs/uuid" "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" - - "github.com/gofrs/uuid" + gproto "google.golang.org/protobuf/proto" "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/common/reload" lbmanagement "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/version" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" ) // unitKey is used to identify a unique unit in a map @@ -66,13 +67,17 @@ type BeatV2Manager struct { isRunning bool - // is set on first instance of a config reload, - // allowing us to restart the beat if stopOnOutputReload is set - outputIsConfigured bool + // set with the last applied output config + // allows tracking if the configuration actually changed and if the + // beat needs to restart if stopOnOutputReload is set + lastOutputCfg *proto.UnitExpectedConfig + + // set with the last applied input configs + lastInputCfgs map[string]*proto.UnitExpectedConfig // used for the debug callback to report as-running config - lastOutputCfg *reload.ConfigWithMeta - lastInputCfg []*reload.ConfigWithMeta + lastBeatOutputCfg *reload.ConfigWithMeta + lastBeatInputCfgs []*reload.ConfigWithMeta } // ================================ @@ -496,22 +501,31 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error { return fmt.Errorf("failed to reload output: %w", err) } cm.lastOutputCfg = nil - return nil - } - - if cm.stopOnOutputReload && cm.outputIsConfigured { - cm.logger.Info("beat is restarting because output changed") - _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil) - cm.Stop() + cm.lastBeatOutputCfg = nil return nil } _, _, rawConfig := unit.Expected() if rawConfig == nil { + // should not happen; hard stop return fmt.Errorf("output unit has no config") } + + if cm.lastOutputCfg != nil && gproto.Equal(cm.lastOutputCfg, rawConfig) { + // configuration for the output did not change; do nothing + cm.logger.Debug("Skipped reloading output; configuration didn't change") + return nil + } + cm.logger.Debugf("Got output unit config '%s'", rawConfig.GetId()) + if cm.stopOnOutputReload && cm.lastOutputCfg != nil { + cm.logger.Info("beat is restarting because output changed") + _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil) + cm.Stop() + return nil + } + reloadConfig, err := groupByOutputs(rawConfig) if err != nil { return fmt.Errorf("failed to generate config for output: %w", err) @@ -521,9 +535,8 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error { if err != nil { return fmt.Errorf("failed to reload output: %w", err) } - cm.lastOutputCfg = reloadConfig - // set to true, we'll reload the output if we need to re-configure - cm.outputIsConfigured = true + cm.lastOutputCfg = rawConfig + cm.lastBeatOutputCfg = reloadConfig return nil } @@ -533,22 +546,40 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { return fmt.Errorf("failed to find beat reloadable type 'input'") } - var inputCfgs []*reload.ConfigWithMeta + inputCfgs := make(map[string]*proto.UnitExpectedConfig, len(inputUnits)) + inputBeatCfgs := make([]*reload.ConfigWithMeta, 0, len(inputUnits)) agentInfo := cm.client.AgentInfo() for _, unit := range inputUnits { _, _, rawConfig := unit.Expected() + if rawConfig == nil { + // should not happen; hard stop + return fmt.Errorf("input unit %s has no config", unit.ID()) + } + + var prevCfg *proto.UnitExpectedConfig + if cm.lastInputCfgs != nil { + prevCfg, _ = cm.lastInputCfgs[unit.ID()] + } + if prevCfg != nil && gproto.Equal(prevCfg, rawConfig) { + // configuration for the input did not change; do nothing + cm.logger.Debugf("Skipped reloading input unit %s; configuration didn't change", unit.ID()) + continue + } + inputCfg, err := generateBeatConfig(rawConfig, agentInfo) if err != nil { return fmt.Errorf("failed to generate configuration for unit %s: %w", unit.ID(), err) } - inputCfgs = append(inputCfgs, inputCfg...) + inputCfgs[unit.ID()] = rawConfig + inputBeatCfgs = append(inputBeatCfgs, inputCfg...) } - err := obj.Reload(inputCfgs) + err := obj.Reload(inputBeatCfgs) if err != nil { return fmt.Errorf("failed to reloading inputs: %w", err) } - cm.lastInputCfg = inputCfgs + cm.lastInputCfgs = inputCfgs + cm.lastBeatInputCfgs = inputBeatCfgs return nil } @@ -557,7 +588,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { func (cm *BeatV2Manager) handleDebugYaml() []byte { // generate input inputList := []map[string]interface{}{} - for _, module := range cm.lastInputCfg { + for _, module := range cm.lastBeatInputCfgs { var inputMap map[string]interface{} err := module.Config.Unpack(&inputMap) if err != nil { @@ -569,8 +600,8 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte { // generate output outputCfg := map[string]interface{}{} - if cm.lastOutputCfg != nil { - err := cm.lastOutputCfg.Config.Unpack(&outputCfg) + if cm.lastBeatOutputCfg != nil { + err := cm.lastBeatOutputCfg.Config.Unpack(&outputCfg) if err != nil { cm.logger.Errorf("error unpacking output config for debug callback: %s", err) return nil