Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat/management]: support filebeat inputs to report their status to elastic-agent #39209

Merged
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c6294b7
feat(management): implement support for reporting input health includ…
pkoutsovasilis May 9, 2024
b5755c9
fix: add missing file licenses
pkoutsovasilis May 9, 2024
73b6d35
feat(tests): update also units revision number
pkoutsovasilis May 9, 2024
59193f8
feat: make stream statuses explicit in reported payload
pkoutsovasilis May 13, 2024
214cdd1
feat: define new env vars to allow overriding scheme and ssl verifica…
pkoutsovasilis May 13, 2024
685290f
feat: utilise the es stack used through integration framework in CEL …
pkoutsovasilis May 13, 2024
d516bfa
fix: remove logging from integration test to reduce verbosity
pkoutsovasilis May 13, 2024
ed5ca0c
chore: move UpdateStatus closer to error logging
pkoutsovasilis May 13, 2024
9b0de34
Revert "feat: define new env vars to allow overriding scheme and ssl …
pkoutsovasilis May 13, 2024
facc1b4
fix: change configStateIdx and a field in the source of input units t…
pkoutsovasilis May 13, 2024
17bd5e1
fix: invoke stringer directly with go run
pkoutsovasilis May 13, 2024
4c828e5
fix: add in godoc that StatusReporter for standalone Beat execution i…
pkoutsovasilis May 13, 2024
dad7ce2
fix(linter): preallocate slice for list of names registered
pkoutsovasilis May 13, 2024
ab84ace
doc: update CHANGELOG.next.asciidoc
pkoutsovasilis May 13, 2024
c2d5ac5
fix(linter): disable typecheck for rest in walkMap
pkoutsovasilis May 13, 2024
5f48b69
fix: return client.UnitStateFailed for unknown statuses
pkoutsovasilis May 14, 2024
286a69c
chore: rename application to unit in statuses godoc to capture better…
pkoutsovasilis May 14, 2024
bacc72a
fix: do not rely on event["error"] to mark the status of cel input as…
pkoutsovasilis May 14, 2024
a342c77
fix: remove usage of testify assert and require packages
pkoutsovasilis May 14, 2024
3564c4a
fix: move integration_test.go to parent directory and add comments to…
pkoutsovasilis May 14, 2024
8ccc752
fix: improve readability by code minor restructuring and comments rev…
pkoutsovasilis May 14, 2024
356c22b
fix: remove stubborn t.Logf
pkoutsovasilis May 15, 2024
519577b
Merge remote-tracking branch 'refs/remotes/beats/main' into pkoutsova…
pkoutsovasilis May 15, 2024
bca4897
fix: simplify code in calcState
pkoutsovasilis May 17, 2024
4cea055
fix: log when a unit is not found during a unit change event
pkoutsovasilis May 17, 2024
cd5e2cb
feat: add unit ID in the unit-state named logger
pkoutsovasilis May 21, 2024
1e80b7c
feat: add unit-tests for unit.go
pkoutsovasilis May 21, 2024
6de05ef
feat: introduce UpdateStatus at v2.Context
pkoutsovasilis May 21, 2024
0daeed8
fix: add missing license header
pkoutsovasilis May 21, 2024
2e1e21f
feat: report at the unit level the error msg of the first FAILED or D…
pkoutsovasilis May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure all responses sent by HTTP Endpoint are HTML-escaped. {pull}39329[39329]
- Update CEL mito extensions to v1.11.0 to improve type checking. {pull}39460[39460]
- Improve logging of request and response with request trace logging in error conditions. {pull}39455[39455]
- Implement Elastic Agent status and health reporting for CEL Filebeat input. {pull}39209[39209]
- Add HTTP metrics to CEL input. {issue}39501[39501] {pull}39503[39503]

*Auditbeat*
Expand Down
30 changes: 19 additions & 11 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
Expand All @@ -50,13 +51,14 @@ type factory struct {
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
statusReporter status.StatusReporter
}

// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is
Expand Down Expand Up @@ -109,6 +111,10 @@ func (f *factory) Create(
}, nil
}

func (r *runner) SetStatusReporter(reported status.StatusReporter) {
r.statusReporter = reported
}

func (r *runner) String() string { return r.input.Name() }

func (r *runner) Start() {
Expand All @@ -121,10 +127,11 @@ func (r *runner) Start() {
log.Infof("Input '%s' starting", name)
err := r.input.Run(
v2.Context{
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
StatusReporter: r.statusReporter,
},
r.connector,
)
Expand All @@ -140,6 +147,7 @@ func (r *runner) Stop() {
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%s' stopped (runner)", r.input.Name())
r.statusReporter = nil
}

func configID(config *conf.C) (string, error) {
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -83,6 +84,11 @@ type Context struct {

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler

// StatusReporter provides a method to update the status of the underlying unit
// that maps to the config. Note: Under standalone execution of Filebeat this is
// expected to be nil.
StatusReporter status.StatusReporter
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}

// TestContext provides the Input Test function with common environmental
Expand Down
7 changes: 7 additions & 0 deletions libbeat/cfgfile/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -153,6 +154,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {

r.logger.Debugf("Starting runner: %s", runner)
r.runners[hash] = runner
if config.StatusReporter != nil {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(config.StatusReporter)
}
}

runner.Start()
moduleStarts.Add(1)
if config.DiagCallback != nil {
Expand Down
8 changes: 7 additions & 1 deletion libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -47,6 +48,11 @@ type ConfigWithMeta struct {

// InputUnitID is the unit's ID that generated this ConfigWithMeta
InputUnitID string

// StatusReporter provides a method to update the status of the underlying unit
// that maps to the config. Note: Under standalone execution of a Beat this is
// expected to be nil.
StatusReporter status.StatusReporter
}

// ReloadableList provides a method to reload the configuration of a list of entities
Expand Down Expand Up @@ -160,7 +166,7 @@ func (r *Registry) GetReloadableOutput() Reloadable {
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
defer r.RUnlock()
var names []string
names := make([]string, 0, len(r.confs)+len(r.confsLists))

for name := range r.confs {
names = append(names, name)
Expand Down
40 changes: 5 additions & 35 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,19 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Status describes the current status of the beat.
type Status int

//go:generate stringer -type=Status
const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing application is starting.
Starting
// Configuring is status describing application is configuring.
Configuring
// Running is status describing application is running.
Running
// Degraded is status describing application is degraded.
Degraded
// Failed is status describing application is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing application is stopping.
Stopping
// Stopped is status describing application is stopped.
Stopped
)

// DebugK used as key for all things central management
var DebugK = "centralmgmt"

// StatusReporter provides a method to update current status of the beat.
type StatusReporter interface {
// UpdateStatus called when the status of the beat has changed.
UpdateStatus(status Status, msg string)
}

// Manager interacts with the beat to provide status updates and to receive
// configurations.
type Manager interface {
StatusReporter
status.StatusReporter

// Enabled returns true if manager is enabled.
Enabled() bool
Expand Down Expand Up @@ -133,7 +103,7 @@ func NewManager(cfg *config.C, registry *reload.Registry) (Manager, error) {
}
return &fallbackManager{
logger: logp.NewLogger("mgmt"),
status: Unknown,
status: status.Unknown,
msg: "",
}, nil
}
Expand All @@ -152,13 +122,13 @@ func SetManagerFactory(factory ManagerFactory) {
type fallbackManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
status status.Status
msg string
stopFunc func()
stopOnce sync.Once
}

func (n *fallbackManager) UpdateStatus(status Status, msg string) {
func (n *fallbackManager) UpdateStatus(status status.Status, msg string) {
n.lock.Lock()
defer n.lock.Unlock()
if n.status != status || n.msg != msg {
Expand Down
55 changes: 55 additions & 0 deletions libbeat/management/status/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package status

// Status describes the current status of the beat.
type Status int

//go:generate go run golang.org/x/tools/cmd/stringer -type=Status
const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing unit is starting.
Starting
// Configuring is status describing unit is configuring.
Configuring
// Running is status describing unit is running.
Running
// Degraded is status describing unit is degraded.
Degraded
// Failed is status describing unit is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing unit is stopping.
Stopping
// Stopped is status describing unit is stopped.
Stopped
)

// StatusReporter provides a method to update current status of a unit.
type StatusReporter interface {
// UpdateStatus updates the status of the unit.
UpdateStatus(status Status, msg string)
}

// WithStatusReporter provides a method to set a status reporter
type WithStatusReporter interface {
// SetStatusReporter sets the status reporter
SetStatusReporter(reporter StatusReporter)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading