Skip to content

Commit

Permalink
Add specific check for InputNotFinished errors
Browse files Browse the repository at this point in the history
Signed-off-by: chrismark <[email protected]>
  • Loading branch information
ChrsMark committed Jun 3, 2020
1 parent 638f96d commit 078c279
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 29 deletions.
17 changes: 17 additions & 0 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package file

import (
"fmt"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -97,3 +98,19 @@ func (s *State) IsEmpty() bool {
len(s.Meta) == 0 &&
s.Timestamp.IsZero()
}

// String returns string representation of the struct
func (s *State) String() string {
return fmt.Sprintf(
"{Id: %v, Finished: %v, Fileinfo: %v, Source: %v, Offset: %v, Timestamp: %v, TTL: %v, Type: %v, Meta: %v, FileStateOS: %v}",
s.Id,
s.Finished,
s.Fileinfo,
s.Source,
s.Offset,
s.Timestamp,
s.TTL,
s.Type,
s.Meta,
s.FileStateOS)
}
5 changes: 3 additions & 2 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package log

import (
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -27,6 +26,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
Expand Down Expand Up @@ -172,7 +173,7 @@ func (p *Input) loadStates(states []file.State) error {

// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return fmt.Errorf("Can only start an input when all related states are finished: %+v", state)
return &common.ErrInputNotFinished{State: state.String()}
}

// Update input states and send new states to registry
Expand Down
56 changes: 33 additions & 23 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,8 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
continue
}

err = a.factory.CheckConfig(config)
err = a.checkConfig(config)
if err != nil {
a.logger.Debug(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s', won't start runner",
common.DebugString(config, true))))
checkConfigAttempts := 0
maxCheckConfigAttempts := 5
for {
err = a.factory.CheckConfig(config)
if err == nil {
a.logger.Debug(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s'[attempt %v], won't start runner",
common.DebugString(config, true), checkConfigAttempts + 1)))
break
}
if checkConfigAttempts > maxCheckConfigAttempts {
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s' after max attempts, won't start runner",
common.DebugString(config, true))))
break
}
time.Sleep(3 * time.Second)
checkConfigAttempts += 1
}
continue
}

Expand All @@ -249,6 +227,38 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
return updated
}

// checkConfig verifies an input config by calling CheckConfig of various factories with a retry in order to
// ensure that a config is not in conflict with an older config which is "stopping" state yet.
func (a *Autodiscover) checkConfig(config *common.Config) error {
checkConfigAttempts := 0
maxCheckConfigAttempts := 5
for {
err := a.factory.CheckConfig(config)
if err == nil {
return nil
} else {
if err, ok := err.(*common.ErrInputNotFinished); !ok {
// error not related to stopping input, raise it now
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s' after max attempts, won't start runner",
common.DebugString(config, true))))
return err
}
a.logger.Debug(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s'[attempt %v], won't start runner",
common.DebugString(config, true), checkConfigAttempts+1)))
}
if checkConfigAttempts > maxCheckConfigAttempts {
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s' after max attempts, won't start runner",
common.DebugString(config, true))))
return err
}
time.Sleep(3 * time.Second)
checkConfigAttempts += 1
}
}

func (a *Autodiscover) handleStop(event bus.Event) bool {
var updated bool

Expand Down
5 changes: 3 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
return p, nil
}

// OnAdd ensures processing of service objects that are newly added
// OnAdd ensures processing of pod objects that are newly added
func (p *pod) OnAdd(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
Expand Down Expand Up @@ -152,12 +152,13 @@ func (p *pod) OnUpdate(obj interface{}) {
p.emit(pod, "start")
}

// GenerateHints creates hints needed for hints builder
// OnDelete stops pod objects that are deleted
func (p *pod) OnDelete(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}

// GenerateHints creates hints needed for hints builder
func (p *pod) GenerateHints(event bus.Event) bus.Event {
// Try to build a config with enabled builders. Send a provider agnostic payload.
// Builders are Beat specific.
Expand Down
15 changes: 15 additions & 0 deletions libbeat/common/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package common

import (
"fmt"
)

// ErrInputNotFinished struct for reporting errors related to not finished inputs
type ErrInputNotFinished struct {
state string
}

// Error method of ErrInputNotFinished
func (e *ErrInputNotFinished) Error() string {
return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.state)
}
4 changes: 2 additions & 2 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ type WatchOptions struct {
}

type item struct {
object interface{}
object interface{}
objectRaw interface{}
state string
state string
}

type watcher struct {
Expand Down

0 comments on commit 078c279

Please sign in to comment.