Skip to content

Commit

Permalink
Fix input reload under Elastic-Agent
Browse files Browse the repository at this point in the history
This commit fixes the input reload issue under Elastic-Agent by
creating an infinity retry logic in the ManagerV2. This is exactly the
same logic used by the configuration reload on a standalone Beat.

Now if when reloading inputs, there is at least one
`common.ErrInputNotFinished` a `forceReload` flag is set to true and
the debounce timer is started. This process will repeat untill no
`common.ErrInputNotFinished` is returned.

The `changeDebounce` period is increased to 1s and the
`forceReloadDebounce` period is set to `10 x changeDebounce`.
  • Loading branch information
belimawr committed Apr 27, 2023
1 parent c80b6dd commit 68df822
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 9 deletions.
5 changes: 4 additions & 1 deletion filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ func (p *Input) loadStates(states []file.State) error {

// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return &common.ErrInputNotFinished{State: state.String()}
return &common.ErrInputNotFinished{
State: state.String(),
File: state.Fileinfo.Name(),
}
}

// Convert state to current identifier if different
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// ErrInputNotFinished struct for reporting errors related to not finished inputs
type ErrInputNotFinished struct {
State string
File string
}

// Error method of ErrInputNotFinished
Expand Down
73 changes: 65 additions & 8 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ package management

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/joeshaw/multierror"
pkgerr "github.com/pkg/errors"
"go.uber.org/zap/zapcore"
gproto "google.golang.org/protobuf/proto"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -49,9 +53,11 @@ type BeatV2Manager struct {
errCanceller context.CancelFunc

// track individual units given to us by the V2 API
mx sync.Mutex
units map[unitKey]*client.Unit
actions []client.Action
mx sync.Mutex
units map[unitKey]*client.Unit
actions []client.Action
forceReload atomic.Bool
reloadLock sync.Mutex

// status is reported as a whole for every unit sent to this component
// hopefully this can be improved in the future to be separated per unit
Expand Down Expand Up @@ -378,7 +384,13 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) {
}

func (cm *BeatV2Manager) unitListen() {
const changeDebounce = 100 * time.Millisecond
const changeDebounce = time.Second

// forceReloadDebounce is greater than changeDebounce because it is only
// used when an input has not reached its finished state, this means some events
// still need to be acked by the acker, hence the longer we wait the more likely
// for the input to have reached its finished state.
const forceReloadDebounce = changeDebounce * 10

// register signal handler
sigc := make(chan os.Signal, 1)
Expand Down Expand Up @@ -439,6 +451,10 @@ func (cm *BeatV2Manager) unitListen() {
}
cm.mx.Unlock()
cm.reload(units)
if cm.forceReload.Load() {
// Restart the debounce timer so we try to reload the inputs.
t.Reset(forceReloadDebounce)
}
}
}
}
Expand Down Expand Up @@ -618,6 +634,8 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
}

func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
cm.reloadLock.Lock()
defer cm.reloadLock.Unlock()
obj := cm.registry.GetInputList()
if obj == nil {
return fmt.Errorf("failed to find beat reloadable type 'input'")
Expand All @@ -642,15 +660,54 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
}

if !didChange(cm.lastInputCfgs, inputCfgs) {
if !didChange(cm.lastInputCfgs, inputCfgs) && !cm.forceReload.Load() {
cm.logger.Debug("Skipped reloading input units; configuration didn't change")
return nil
}

err := obj.Reload(inputBeatCfgs)
if err != nil {
return fmt.Errorf("failed to reloading inputs: %w", err)
if cm.forceReload.Load() {
cm.logger.Debug("Reloading Beats inputs because forceReload is true")
}

if err := obj.Reload(inputBeatCfgs); err != nil {
merror := &multierror.MultiError{}
realErrors := multierror.Errors{}

// At the moment this logic is tightly bound to the current RunnerList
// implementation from libbeat/cfgfile/list.go and Input.loadStates from
// filebeat/input/log/input.go.
// If they change the way they report errors, this will break.
// TODO (Tiago): update all layers to use the most recent features from
// the standard library errors package.
if errors.As(err, &merror) {
for _, err := range merror.Errors {
cause := pkgerr.Cause(err)
// A Log input is only marked as finished when all events it
// produceds are acked by the acker so when we see this error,
// we just retry until the new input can be started.
// This is the same logic used by the standalone configuration file
// reloader implemented on libbeat/cfgfile/reload.go
if err, ok := cause.(*common.ErrInputNotFinished); ok {
cm.logger.Debugf("file %q is not finished, will retry starting the input later", err.File)
cm.forceReload.Store(true)
cm.logger.Debug("ForceReload set to TRUE")
continue
}

// This is an error that cannot be ignored, so we report it
realErrors = append(realErrors, err)
}
}

if len(realErrors) != 0 {
return fmt.Errorf("failed to reload inputs: %w", realErrors.Err())
}
} else {
// no issues while reloading inputs, set forceReload to false
cm.forceReload.Store(false)
cm.logger.Info("ForceReload set to FALSE")
}

cm.lastInputCfgs = inputCfgs
cm.lastBeatInputCfgs = inputBeatCfgs
return nil
Expand Down

0 comments on commit 68df822

Please sign in to comment.