Skip to content

Commit

Permalink
Add metadata for missing k8s resources/metricsets (#31590)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark committed May 12, 2022
1 parent a03f883 commit e48882d
Show file tree
Hide file tree
Showing 19 changed files with 374 additions and 240 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Extend documentation about `orchestrator.cluster` fields {pull}30518[30518]
- Enhance Oracle Module: Change tablespace metricset collection period {issue}30948[30948] {pull}31259[#31259]
- Add orchestrator cluster ECS fields in kubernetes events {pull}31341[31341]
- Add metadata for missing k8s resources/metricsets {pull}31590[31590]

*Packetbeat*

Expand Down
3 changes: 3 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ rules:
- events
- pods
- services
- persistentvolumes
- persistentvolumeclaims
verbs: ["get", "list", "watch"]
# Enable this rule only if planing to use Kubernetes keystore
#- apiGroups: [""]
Expand All @@ -294,6 +296,7 @@ rules:
- statefulsets
- deployments
- replicasets
- daemonsets
verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
resources:
Expand Down
3 changes: 3 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ rules:
- events
- pods
- services
- persistentvolumes
- persistentvolumeclaims
verbs: ["get", "list", "watch"]
# Enable this rule only if planing to use Kubernetes keystore
#- apiGroups: [""]
Expand All @@ -27,6 +29,7 @@ rules:
- statefulsets
- deployments
- replicasets
- daemonsets
verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
resources:
Expand Down
35 changes: 35 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "statefulset"
case *DaemonSet:
ss := client.AppsV1().DaemonSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}

objType = "daemonset"
case *Service:
svc := client.CoreV1().Services(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down Expand Up @@ -185,7 +197,30 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "job"
case *PersistentVolume:
ss := client.CoreV1().PersistentVolumes()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}

objType = "persistentvolume"
case *PersistentVolumeClaim:
ss := client.CoreV1().PersistentVolumeClaims(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}

objType = "persistentvolumeclaim"
case *Role:
r := client.RbacV1().Roles(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
9 changes: 9 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type ReplicaSet = appsv1.ReplicaSet
// StatefulSet data
type StatefulSet = appsv1.StatefulSet

// DaemonSet data
type DaemonSet = appsv1.DaemonSet

// Service data
type Service = v1.Service

Expand All @@ -85,6 +88,12 @@ type Job = batchv1.Job
// CronJob data
type CronJob = batchv1.CronJob

// PersistentVolume data
type PersistentVolume = v1.PersistentVolume

// PersistentVolumeClaim data
type PersistentVolumeClaim = v1.PersistentVolumeClaim

// Role data
type Role = rbacv1.Role

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ rules:
- events
- pods
- services
- persistentvolumes
- persistentvolumeclaims
verbs: ["get", "list", "watch"]
# Enable this rule only if planing to use Kubernetes keystore
#- apiGroups: [""]
Expand All @@ -284,6 +286,12 @@ rules:
- statefulsets
- deployments
- replicasets
- daemonsets
verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
resources:
- jobs
- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
20 changes: 1 addition & 19 deletions metricbeat/module/kubernetes/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import (
"fmt"
"time"

k8sclient "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -101,7 +98,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// add ECS orchestrator fields
cfg, _ := conf.NewConfigFrom(&config)
ecsClusterMeta, err := getClusterECSMeta(cfg, client, ms.Logger())
ecsClusterMeta, err := util.GetClusterECSMeta(cfg, client, ms.Logger())
if err != nil {
ms.Logger().Debugf("could not retrieve cluster metadata: %w", err)
}
Expand All @@ -112,21 +109,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return ms, nil
}

func getClusterECSMeta(cfg *conf.C, client k8sclient.Interface, logger *logp.Logger) (mapstr.M, error) {
clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client)
if err != nil {
return nil, fmt.Errorf("fail to get kubernetes cluster metadata: %w", err)
}
ecsClusterMeta := mapstr.M{}
if clusterInfo.Url != "" {
util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.Url, logger)
}
if clusterInfo.Name != "" {
util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger)
}
return ecsClusterMeta, nil
}

// Run method provides the Kubernetes event watcher with a reporter with which events can be reported.
func (m *MetricSet) Run(reporter mb.PushReporterV2) {
now := time.Now()
Expand Down
Loading

0 comments on commit e48882d

Please sign in to comment.