diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 93da0220287..b7911dc31b4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix an issue where error messages are not accurate in mapstriface. {issue}18662[18662] {pull}18663[18663] - Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818] - Fix potential race condition in fingerprint processor. {pull}18738[18738] +- Add better handling for Kubernetes Update and Delete watcher events. {pull}18882[18882] - Fix the `translate_sid` processor's handling of unconfigured target fields. {issue}18990[18990] {pull}18991[18991] - Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916] - The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945] diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 974beb7253a..e26a2521c16 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -206,7 +206,9 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { err = a.factory.CheckConfig(config) if err != nil { - a.logger.Error(errors.Wrap(err, fmt.Sprintf("Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true)))) + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) continue } diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d671cf14a22..c4260dab1d5 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -122,9 +122,9 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub return p, nil } -// OnAdd ensures processing of service objects that are newly added +// OnAdd ensures processing of pod objects that are newly added func (p *pod) OnAdd(obj interface{}) { - p.logger.Debugf("Watcher Node add: %+v", obj) + p.logger.Debugf("Watcher Pod add: %+v", obj) p.emit(obj.(*kubernetes.Pod), "start") } @@ -134,12 +134,16 @@ func (p *pod) OnAdd(obj interface{}) { func (p *pod) OnUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) - // If Pod is in a phase where all containers in the have terminated emit a stop event - if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed { + p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) + switch pod.Status.Phase { + case kubernetes.PodSucceeded, kubernetes.PodFailed: + // If Pod is in a phase where all containers in the have terminated emit a stop event p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) return + case kubernetes.PodPending: + p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) + return } p.logger.Debugf("Watcher Pod update: %+v", obj) @@ -147,12 +151,13 @@ func (p *pod) OnUpdate(obj interface{}) { p.emit(pod, "start") } -// GenerateHints creates hints needed for hints builder +// OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { - p.logger.Debugf("Watcher Node delete: %+v", obj) + p.logger.Debugf("Watcher Pod delete: %+v", obj) time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) } +// GenerateHints creates hints needed for hints builder func (p *pod) GenerateHints(event bus.Event) bus.Event { // Try to build a config with enabled builders. Send a provider agnostic payload. // Builders are Beat specific. diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 33cc808358a..606a36ac109 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -69,8 +69,9 @@ type WatchOptions struct { } type item struct { - object interface{} - state string + object interface{} + objectRaw interface{} + state string } type watcher struct { @@ -175,8 +176,7 @@ func (w *watcher) enqueue(obj interface{}, state string) { if err != nil { return } - - w.queue.Add(&item{key, state}) + w.queue.Add(&item{key, obj, state}) } // process gets the top of the work queue and processes the object that is received. @@ -204,6 +204,11 @@ func (w *watcher) process(ctx context.Context) bool { return nil } if !exists { + if entry.state == delete { + w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) + // delete anyway in order to clean states + w.handler.OnDelete(entry.objectRaw) + } return nil }