Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better handling for Kubernetes Update and Delete watcher events #18882

Merged
merged 13 commits into from
Jun 17, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix an issue where error messages are not accurate in mapstriface. {issue}18662[18662] {pull}18663[18663]
- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818]
- Fix potential race condition in fingerprint processor. {pull}18738[18738]
- Add better handling for Kubernetes Update and Delete watcher events. {pull}18882[18882]
- Fix the `translate_sid` processor's handling of unconfigured target fields. {issue}18990[18990] {pull}18991[18991]
- Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916]
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
Expand Down
4 changes: 3 additions & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {

err = a.factory.CheckConfig(config)
if err != nil {
a.logger.Error(errors.Wrap(err, fmt.Sprintf("Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true))))
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s', won't start runner",
common.DebugString(config, true))))
continue
}

Expand Down
19 changes: 12 additions & 7 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
return p, nil
}

// OnAdd ensures processing of service objects that are newly added
// OnAdd ensures processing of pod objects that are newly added
func (p *pod) OnAdd(obj interface{}) {
p.logger.Debugf("Watcher Node add: %+v", obj)
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}

Expand All @@ -134,25 +134,30 @@ func (p *pod) OnAdd(obj interface{}) {
func (p *pod) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

// If Pod is in a phase where all containers in the have terminated emit a stop event
if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed {
p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
// If Pod is in a phase where all containers in the have terminated emit a stop event
p.logger.Debugf("Watcher Pod update (terminating): %+v", obj)

time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") })
return
case kubernetes.PodPending:
p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj)
return
}

p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(pod, "stop")
p.emit(pod, "start")
}

// GenerateHints creates hints needed for hints builder
// OnDelete stops pod objects that are deleted
func (p *pod) OnDelete(obj interface{}) {
p.logger.Debugf("Watcher Node delete: %+v", obj)
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}

// GenerateHints creates hints needed for hints builder
func (p *pod) GenerateHints(event bus.Event) bus.Event {
// Try to build a config with enabled builders. Send a provider agnostic payload.
// Builders are Beat specific.
Expand Down
13 changes: 9 additions & 4 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ type WatchOptions struct {
}

type item struct {
object interface{}
state string
object interface{}
objectRaw interface{}
state string
}

type watcher struct {
Expand Down Expand Up @@ -175,8 +176,7 @@ func (w *watcher) enqueue(obj interface{}, state string) {
if err != nil {
return
}

w.queue.Add(&item{key, state})
w.queue.Add(&item{key, obj, state})
}

// process gets the top of the work queue and processes the object that is received.
Expand Down Expand Up @@ -204,6 +204,11 @@ func (w *watcher) process(ctx context.Context) bool {
return nil
}
if !exists {
if entry.state == delete {
w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key)
// delete anyway in order to clean states
w.handler.OnDelete(entry.objectRaw)
}
return nil
}

Expand Down