Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch kubernetes namespaces for autodiscover metadata for pods #25117

Merged
merged 4 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add new ECS 1.9 field `cloud.service.name` to `add_cloud_metadata` processor. {pull}24993[24993]
- Libbeat: report queue capacity, output batch size, and output client count to monitoring. {pull}24700[24700]
- Add kubernetes.pod.ip field in kubernetes metadata. {pull}25037[25037]
- Discover changes in Kubernetes namespace metadata as soon as they happen. {pull}25117[25117]

*Auditbeat*

Expand Down
84 changes: 82 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package kubernetes

import (
"fmt"
"sync"
"time"

"github.com/gofrs/uuid"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -43,7 +43,11 @@ type pod struct {
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
namespaceStore cache.Store

// Mutex used by configuration updates not triggered by the main watcher,
// to avoid race conditions between cross updates and deletions.
// Other updaters must use a write lock.
crossUpdate sync.RWMutex
}

// NewPodEventer creates an eventer that can discover and process pod objects
Expand Down Expand Up @@ -111,11 +115,20 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
}

watcher.AddEventHandler(p)

if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) {
updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}

return p, nil
}

// OnAdd ensures processing of pod objects that are newly added
func (p *pod) OnAdd(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()

p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}
Expand All @@ -124,6 +137,13 @@ func (p *pod) OnAdd(obj interface{}) {
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *pod) OnUpdate(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()

p.unlockedUpdate(obj)
}

func (p *pod) unlockedUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
Expand Down Expand Up @@ -162,6 +182,9 @@ func (p *pod) OnUpdate(obj interface{}) {

// OnDelete stops pod objects that are deleted
func (p *pod) OnDelete(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()

p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}
Expand Down Expand Up @@ -448,3 +471,60 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
p.publish(events)
}
}

// podUpdaterHandlerFunc is a function that handles pod updater notifications.
type podUpdaterHandlerFunc func(interface{})

// podUpdaterStore is the interface that an object needs to implement to be
// used as a pod updater store.
type podUpdaterStore interface {
List() []interface{}
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
}

// newNamespacePodUpdater creates a namespacePodUpdater
func newNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
}
}

// OnUpdate handles update events on namespaces.
func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
ns, ok := obj.(*kubernetes.Namespace)
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*kubernetes.Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
// namespace they will generate their own add events.
func (*namespacePodUpdater) OnAdd(interface{}) {}

// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*namespacePodUpdater) OnDelete(interface{}) {}
67 changes: 67 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1552,6 +1553,72 @@ func TestEmitEvent(t *testing.T) {
}
}

func TestNamespacePodUpdater(t *testing.T) {
pod := func(name, namespace string) *kubernetes.Pod {
return &kubernetes.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}

cases := map[string]struct {
pods []interface{}
expected []interface{}
}{
"no pods": {},
"two pods but only one in namespace": {
pods: []interface{}{
pod("onepod", "foo"),
pod("onepod", "bar"),
},
expected: []interface{}{
pod("onepod", "foo"),
},
},
"two pods but none in namespace": {
pods: []interface{}{
pod("onepod", "bar"),
pod("otherpod", "bar"),
},
},
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
handler := &mockUpdaterHandler{}
store := &mockUpdaterStore{objects: c.pods}
updater := newNamespacePodUpdater(handler.OnUpdate, store, &sync.Mutex{})

namespace := &kubernetes.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
}
updater.OnUpdate(namespace)

assert.EqualValues(t, c.expected, handler.objects)
})
}
}

type mockUpdaterHandler struct {
objects []interface{}
}

func (h *mockUpdaterHandler) OnUpdate(obj interface{}) {
h.objects = append(h.objects, obj)
}

type mockUpdaterStore struct {
objects []interface{}
}

func (s *mockUpdaterStore) List() []interface{} {
return s.objects
}

func NewMockPodEventerManager(pod *pod) EventManager {
em := &eventerManager{}
em.eventer = pod
Expand Down