Skip to content

Commit

Permalink
Add deployment name in pod's meta (elastic#23610)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Jan 27, 2021
1 parent b51d36a commit 2d7e7b4
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update the baseline version of Sarama (Kafka support library) to 1.27.2. {pull}23595[23595]
- Add kubernetes.volume.fs.used.pct field. {pull}23564[23564]
- Add the `enable_krb5_fast` flag to the Kafka output to explicitly opt-in to FAST authentication. {pull}23629[23629]
- Add deployment name in pod's meta. {pull}23610[23610]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New(logp.NewLogger("bus"), "test"),
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/kubernetes/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func GetPodMetaGen(
if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
namespaceMetaGen = NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store())
}
metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), nodeMetaGen, namespaceMetaGen)
metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), podWatcher.Client(), nodeMetaGen, namespaceMetaGen)

return metaGen
}
43 changes: 42 additions & 1 deletion libbeat/common/kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package metadata

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -26,18 +30,25 @@ import (

type pod struct {
store cache.Store
client k8s.Interface
node MetaGen
namespace MetaGen
resource *Resource
}

// NewPodMetadataGenerator creates a metagen for pod resources
func NewPodMetadataGenerator(cfg *common.Config, pods cache.Store, node MetaGen, namespace MetaGen) MetaGen {
func NewPodMetadataGenerator(
cfg *common.Config,
pods cache.Store,
client k8s.Interface,
node MetaGen,
namespace MetaGen) MetaGen {
return &pod{
resource: NewResourceMetadataGenerator(cfg),
store: pods,
node: node,
namespace: namespace,
client: client,
}
}

Expand All @@ -50,6 +61,15 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.Map

out := p.resource.Generate("pod", obj, opts...)

// check if Pod is handled by a ReplicaSet which is controlled by a Deployment
rsName, _ := out.GetValue("replicaset.name")
if rsName, ok := rsName.(string); ok {
dep := p.getRSDeployment(rsName, po.GetNamespace())
if dep != "" {
out.Put("deployment.name", dep)
}
}

if p.node != nil {
meta := p.node.GenerateFromName(po.Spec.NodeName, WithLabels("node"))
if meta != nil {
Expand Down Expand Up @@ -89,3 +109,24 @@ func (p *pod) GenerateFromName(name string, opts ...FieldOptions) common.MapStr

return nil
}

// getRSDeployment return the name of the Deployment object that
// owns the ReplicaSet with the given name under the given Namespace
func (p *pod) getRSDeployment(rsName string, ns string) string {
if p.client == nil {
return ""
}
rs, err := p.client.AppsV1().ReplicaSets(ns).Get(context.TODO(), rsName, metav1.GetOptions{})
if err != nil {
return ""
}
for _, ref := range rs.GetOwnerReferences() {
if ref.Controller != nil && *ref.Controller {
switch ref.Kind {
case "Deployment":
return ref.Name
}
}
}
return ""
}
114 changes: 111 additions & 3 deletions libbeat/common/kubernetes/metadata/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,80 @@
package metadata

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
)

func TestPod_Generate(t *testing.T) {
client := k8sfake.NewSimpleClientset()
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
namespace := "default"
name := "obj"
boolean := true
rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
Spec: appsv1.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx:1.12",
Ports: []v1.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}

_, err := client.AppsV1().ReplicaSets(namespace).Create(context.Background(), rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create k8s deployment: %v", err)
}

tests := []struct {
input kubernetes.Resource
output common.MapStr
Expand Down Expand Up @@ -133,14 +187,68 @@ func TestPod_Generate(t *testing.T) {
},
},
},
{
name: "test object with owner reference to replicaset",
input: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(uid),
Namespace: namespace,
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
"app": "production",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "ReplicaSet",
Name: "nginx-rs",
UID: "005f3b90-4b9d-12f8-acf0-31020a8409087",
Controller: &boolean,
},
},
},
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
Spec: v1.PodSpec{
NodeName: "testnode",
},
},
output: common.MapStr{
"pod": common.MapStr{
"name": "obj",
"uid": uid,
},
"namespace": "default",
"deployment": common.MapStr{
"name": "nginx-deployment",
},
"replicaset": common.MapStr{
"name": "nginx-rs",
},
"node": common.MapStr{
"name": "testnode",
},
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"app": "production",
},
},
},
}

config, err := common.NewConfigFrom(map[string]interface{}{
"include_annotations": []string{"app"},
})
assert.NoError(t, err)

metagen := NewPodMetadataGenerator(config, nil, nil, nil)
metagen := NewPodMetadataGenerator(config, nil, client, nil, nil)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.output, metagen.Generate(test.input))
Expand Down Expand Up @@ -257,7 +365,7 @@ func TestPod_GenerateFromName(t *testing.T) {
assert.NoError(t, err)
pods := cache.NewStore(cache.MetaNamespaceKeyFunc)
pods.Add(test.input)
metagen := NewPodMetadataGenerator(config, pods, nil, nil)
metagen := NewPodMetadataGenerator(config, pods, nil, nil, nil)

accessor, err := meta.Accessor(test.input)
require.NoError(t, err)
Expand Down Expand Up @@ -376,7 +484,7 @@ func TestPod_GenerateWithNodeNamespace(t *testing.T) {
namespaces.Add(test.namespace)
nsMeta := NewNamespaceMetadataGenerator(config, namespaces)

metagen := NewPodMetadataGenerator(config, pods, nodeMeta, nsMeta)
metagen := NewPodMetadataGenerator(config, pods, nil, nodeMeta, nsMeta)
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.output, metagen.Generate(test.input))
})
Expand Down
8 changes: 8 additions & 0 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type Watcher interface {

// Store returns the store object for the watcher
Store() cache.Store

// Client returns the kubernetes client object used by the watcher
Client() kubernetes.Interface
}

// WatchOptions controls watch behaviors
Expand Down Expand Up @@ -165,6 +168,11 @@ func (w *watcher) Store() cache.Store {
return w.store
}

// Client returns the kubernetes client object used by the watcher
func (w *watcher) Client() kubernetes.Interface {
return w.client
}

// Start watching pods
func (w *watcher) Start() error {
go w.informer.Run(w.ctx.Done())
Expand Down
8 changes: 4 additions & 4 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
)

var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil)

func TestPodIndexer(t *testing.T) {
var testConfig = common.NewConfig()
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestPodIndexer(t *testing.T) {
func TestPodUIDIndexer(t *testing.T) {
var testConfig = common.NewConfig()

metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil)

podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID)
assert.NoError(t, err)
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestFilteredGenMeta(t *testing.T) {
})
assert.NoError(t, err)

filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil)
filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil)

podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen)
assert.NoError(t, err)
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
})
assert.NoError(t, err)

filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil)
filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil)

podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen)
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func NewResourceMetadataEnricher(
cfg, _ := common.NewConfigFrom(&metaConfig)

metaGen := metadata.NewResourceMetadataGenerator(cfg)
podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, nil, nil)
podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil)
enricher := buildMetadataEnricher(watcher,
// update
func(m map[string]common.MapStr, r kubernetes.Resource) {
Expand Down Expand Up @@ -213,7 +213,7 @@ func NewContainerMetadataEnricher(

cfg, _ := common.NewConfigFrom(&metaConfig)

metaGen := metadata.NewPodMetadataGenerator(cfg, nil, nil, nil)
metaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil)
enricher := buildMetadataEnricher(watcher,
// update
func(m map[string]common.MapStr, r kubernetes.Resource) {
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/module/kubernetes/util/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package util
import (
"testing"

k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -157,3 +158,7 @@ func (m *mockWatcher) AddEventHandler(r kubernetes.ResourceEventHandler) {
func (m *mockWatcher) Store() cache.Store {
return nil
}

func (m *mockWatcher) Client() k8s.Interface {
return nil
}

0 comments on commit 2d7e7b4

Please sign in to comment.