Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix input reload under Elastic-Agent #35250

Merged
merged 47 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
68df822
Fix input reload under Elastic-Agent
belimawr Apr 27, 2023
7fce900
PR improvements
belimawr Apr 28, 2023
4d0732b
Address lint issues
belimawr Apr 28, 2023
37cfef3
Implement tests
belimawr Apr 28, 2023
2ba009a
Remove test debug logs
belimawr May 2, 2023
66176c8
fmt and add lisence headers
belimawr May 2, 2023
b31bd28
make lint happy again
belimawr May 2, 2023
c902ab8
Add changelog
belimawr May 2, 2023
dd0a47e
improve log entries
belimawr May 3, 2023
0b36d19
Clean up tests
belimawr May 3, 2023
71bd239
PR improvements
belimawr May 5, 2023
4d2e32d
[WIP] Integration tests
belimawr May 5, 2023
a1ac3bb
improve tests, move files to Filebeat
belimawr May 5, 2023
b149c85
clean up tests
belimawr May 5, 2023
dda54d9
[WIP] Use log file for Filebeat
belimawr May 5, 2023
d3e45a2
clean up integration tests
belimawr May 8, 2023
091dd31
Merge remote-tracking branch 'upstream/main' into fix-input-reload-un…
belimawr May 8, 2023
e65618b
debounce and force reload times are configurable
belimawr May 8, 2023
b3028a7
clean up
belimawr May 8, 2023
78930f0
make linter happy
belimawr May 8, 2023
0049bd1
try uploading test logs on failure
belimawr May 8, 2023
ea5d84f
fix archiveArtifacts path
belimawr May 9, 2023
f12f5e9
debugging CI
belimawr May 9, 2023
cff80ed
refactor the way logs are read.
belimawr May 9, 2023
04a87d0
More CI debugging.
belimawr May 9, 2023
3a299a6
Apply suggestions from code review
belimawr May 10, 2023
2424579
PR review changes
belimawr May 10, 2023
892e4ed
more debug logs for CI
belimawr May 10, 2023
5f44afb
use `bufio.Reader` instead of `bufio.Scanner`
belimawr May 10, 2023
c153d74
typo
belimawr May 10, 2023
12a0ba5
clean up after all fixes/debugging
belimawr May 11, 2023
a677530
Final PR improvements.
belimawr May 11, 2023
1ffe0b6
Apply suggestions from code review
belimawr May 12, 2023
dc147b1
Update x-pack/filebeat/tests/integration/framework_test.go
belimawr May 12, 2023
9ea251a
Update x-pack/filebeat/tests/integration/framework_test.go
belimawr May 12, 2023
4c2fcd7
PR improvements
belimawr May 12, 2023
f1bf065
PR improvements
belimawr May 12, 2023
0f67fb2
Apply suggestions from code review
belimawr May 17, 2023
c6a74e9
fix typos
belimawr May 17, 2023
efd1811
improve comments
belimawr May 17, 2023
49f314d
stop logging gorourine when test finishes
belimawr May 17, 2023
e9ce324
Apply suggestions from code review
belimawr May 17, 2023
d36f486
Update x-pack/libbeat/management/managerV2.go
belimawr May 17, 2023
a98f0a9
use env vars with defaults for credentials
belimawr May 30, 2023
8eece56
replace uses of `Cause` by `errors.Unwrap`
belimawr May 30, 2023
844194a
Update x-pack/filebeat/tests/integration/input_reload_test.go
belimawr May 31, 2023
a37a275
Update x-pack/libbeat/management/managerV2.go
belimawr May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ func (p *Input) loadStates(states []file.State) error {

// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return &common.ErrInputNotFinished{State: state.String()}
return &common.ErrInputNotFinished{
State: state.String(),
File: state.Fileinfo.Name(),
}
}

// Convert state to current identifier if different
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// ErrInputNotFinished struct for reporting errors related to not finished inputs
type ErrInputNotFinished struct {
State string
File string
}

// Error method of ErrInputNotFinished
Expand Down
73 changes: 65 additions & 8 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ package management

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/joeshaw/multierror"
pkgerr "github.com/pkg/errors"
"go.uber.org/zap/zapcore"
gproto "google.golang.org/protobuf/proto"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -49,9 +53,11 @@ type BeatV2Manager struct {
errCanceller context.CancelFunc

// track individual units given to us by the V2 API
mx sync.Mutex
units map[unitKey]*client.Unit
actions []client.Action
mx sync.Mutex
units map[unitKey]*client.Unit
actions []client.Action
forceReload atomic.Bool
reloadLock sync.Mutex
belimawr marked this conversation as resolved.
Show resolved Hide resolved

// status is reported as a whole for every unit sent to this component
// hopefully this can be improved in the future to be separated per unit
Expand Down Expand Up @@ -378,7 +384,13 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) {
}

func (cm *BeatV2Manager) unitListen() {
const changeDebounce = 100 * time.Millisecond
const changeDebounce = time.Second

// forceReloadDebounce is greater than changeDebounce because it is only
// used when an input has not reached its finished state, this means some events
// still need to be acked by the acker, hence the longer we wait the more likely
// for the input to have reached its finished state.
const forceReloadDebounce = changeDebounce * 10
belimawr marked this conversation as resolved.
Show resolved Hide resolved

// register signal handler
sigc := make(chan os.Signal, 1)
Expand Down Expand Up @@ -439,6 +451,10 @@ func (cm *BeatV2Manager) unitListen() {
}
cm.mx.Unlock()
cm.reload(units)
if cm.forceReload.Load() {
// Restart the debounce timer so we try to reload the inputs.
t.Reset(forceReloadDebounce)
belimawr marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down Expand Up @@ -618,6 +634,8 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
}

func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
cm.reloadLock.Lock()
defer cm.reloadLock.Unlock()
belimawr marked this conversation as resolved.
Show resolved Hide resolved
obj := cm.registry.GetInputList()
if obj == nil {
return fmt.Errorf("failed to find beat reloadable type 'input'")
Expand All @@ -642,15 +660,54 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
}

if !didChange(cm.lastInputCfgs, inputCfgs) {
if !didChange(cm.lastInputCfgs, inputCfgs) && !cm.forceReload.Load() {
cm.logger.Debug("Skipped reloading input units; configuration didn't change")
return nil
}

err := obj.Reload(inputBeatCfgs)
if err != nil {
return fmt.Errorf("failed to reloading inputs: %w", err)
if cm.forceReload.Load() {
cm.logger.Debug("Reloading Beats inputs because forceReload is true")
}

if err := obj.Reload(inputBeatCfgs); err != nil {
merror := &multierror.MultiError{}
realErrors := multierror.Errors{}

// At the moment this logic is tightly bound to the current RunnerList
// implementation from libbeat/cfgfile/list.go and Input.loadStates from
// filebeat/input/log/input.go.
// If they change the way they report errors, this will break.
// TODO (Tiago): update all layers to use the most recent features from
// the standard library errors package.
if errors.As(err, &merror) {
for _, err := range merror.Errors {
cause := pkgerr.Cause(err)
// A Log input is only marked as finished when all events it
// produceds are acked by the acker so when we see this error,
belimawr marked this conversation as resolved.
Show resolved Hide resolved
// we just retry until the new input can be started.
// This is the same logic used by the standalone configuration file
// reloader implemented on libbeat/cfgfile/reload.go
if err, ok := cause.(*common.ErrInputNotFinished); ok {
cm.logger.Debugf("file %q is not finished, will retry starting the input later", err.File)
cm.forceReload.Store(true)
cm.logger.Debug("ForceReload set to TRUE")
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// This is an error that cannot be ignored, so we report it
realErrors = append(realErrors, err)
}
}

if len(realErrors) != 0 {
return fmt.Errorf("failed to reload inputs: %w", realErrors.Err())
}
} else {
// no issues while reloading inputs, set forceReload to false
cm.forceReload.Store(false)
cm.logger.Info("ForceReload set to FALSE")
}

cm.lastInputCfgs = inputCfgs
cm.lastBeatInputCfgs = inputBeatCfgs
return nil
Expand Down