From d6ee968b44dbb9413c8472ca1712914d192cbc00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Tue, 6 Oct 2020 18:36:24 +0200 Subject: [PATCH] Add kubernetes composable inputs provider (#21480) * Add kubernetes composable inputs provider --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + x-pack/elastic-agent/pkg/agent/cmd/include.go | 1 + .../composable/providers/kubernetes/config.go | 31 +++ .../providers/kubernetes/kubernetes.go | 215 ++++++++++++++++++ 4 files changed, 248 insertions(+) create mode 100644 x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go create mode 100644 x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 639b1dbad17..897b64c517e 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -28,6 +28,7 @@ - Add support for EQL based condition on inputs {pull}20994[20994] - Send `fleet.host.id` to Endpoint Security {pull}21042[21042] - Add `install` and `uninstall` subcommands {pull}21206[21206] +- Add `kubernetes` composable dynamic provider. {pull}21480[21480] - Send updating state {pull}21461[21461] - Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543] - Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425] diff --git a/x-pack/elastic-agent/pkg/agent/cmd/include.go b/x-pack/elastic-agent/pkg/agent/cmd/include.go index 87506b88415..b955c85418f 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/include.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/include.go @@ -10,6 +10,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/docker" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/path" diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go new file mode 100644 index 00000000000..200bedbe878 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// TODO review the need for this +// +build linux darwin windows + +package kubernetes + +import ( + "time" +) + +// Config for kubernetes provider +type Config struct { + KubeConfig string `config:"kube_config"` + SyncPeriod time.Duration `config:"sync_period"` + CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"` + + // Needed when resource is a pod + Node string `config:"node"` + + // Scope of the provider (cluster or node) + Scope string `config:"scope"` +} + +// InitDefaults initializes the default values for the config. +func (c *Config) InitDefaults() { + c.SyncPeriod = 10 * time.Minute + c.CleanupTimeout = 60 * time.Second +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go new file mode 100644 index 00000000000..c777c8a05ce --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -0,0 +1,215 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +func init() { + composable.Providers.AddDynamicProvider("kubernetes", DynamicProviderBuilder) +} + +type dynamicProvider struct { + logger *logger.Logger + config *Config +} + +type eventWatcher struct { + logger *logger.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm +} + +// DynamicProviderBuilder builds the dynamic provider. +func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { + var cfg Config + if c == nil { + c = config.New() + } + err := c.Unpack(&cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + return &dynamicProvider{logger, &cfg}, nil +} + +// Run runs the environment context provider. +func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { + client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig) + if err != nil { + // info only; return nil (do nothing) + p.logger.Debugf("Kubernetes provider skipped, unable to connect: %s", err) + return nil + } + + // Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty + // when cluster scope is enforced. + if p.config.Scope == "node" { + p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client) + } else { + p.config.Node = "" + } + p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope) + p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node) + + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ + SyncTimeout: p.config.SyncPeriod, + Node: p.config.Node, + //Namespace: p.config.Namespace, + }, nil) + if err != nil { + return errors.New(err, "couldn't create kubernetes watcher") + } + + watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm}) + + err = watcher.Start() + if err != nil { + return errors.New(err, "couldn't start kubernetes watcher") + } + + return nil +} + +func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) { + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": pod.GetLabels(), + "ip": pod.Status.PodIP, + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + // Emit the pod + // We emit Pod + containers to ensure that configs matching Pod only + // get Pod metadata (not specific to any container) + p.comm.AddOrUpdate(string(pod.GetUID()), mapping, processors) + + // Emit all containers in the pod + p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) + + // TODO deal with init containers stopping after initialization + p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) +} + +func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { + // Collect all runtimes from status information. + containerIDs := map[string]string{} + runtimes := map[string]string{} + for _, c := range containerstatuses { + cid, runtime := kubernetes.ContainerIDWithRuntime(c) + containerIDs[c.Name] = cid + runtimes[c.Name] = runtime + } + + for _, c := range containers { + // If it doesn't have an ID, container doesn't exist in + // the runtime, emit only an event if we are stopping, so + // we are sure of cleaning up configurations. + cid := containerIDs[c.Name] + if cid == "" { + continue + } + + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": pod.GetLabels(), + "ip": pod.Status.PodIP, + }, + "container": map[string]interface{}{ + "id": cid, + "name": c.Name, + "image": c.Image, + "runtime": runtimes[c.Name], + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + // Emit the container + p.comm.AddOrUpdate(eventID, mapping, processors) + } +} + +func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) { + p.comm.Remove(string(pod.GetUID())) + + for _, c := range pod.Spec.Containers { + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + p.comm.Remove(eventID) + } + + for _, c := range pod.Spec.InitContainers { + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + p.comm.Remove(eventID) + } +} + +// OnAdd ensures processing of pod objects that are newly added +func (p *eventWatcher) OnAdd(obj interface{}) { + p.logger.Debugf("pod add: %+v", obj) + p.emitRunning(obj.(*kubernetes.Pod)) +} + +// OnUpdate emits events for a given pod depending on the state of the pod, +// if it is terminating, a stop event is scheduled, if not, a stop and a start +// events are sent sequentially to recreate the resources assotiated to the pod. +func (p *eventWatcher) OnUpdate(obj interface{}) { + pod := obj.(*kubernetes.Pod) + + p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) + switch pod.Status.Phase { + case kubernetes.PodSucceeded, kubernetes.PodFailed: + time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) + return + case kubernetes.PodPending: + p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj) + return + } + + p.logger.Debugf("pod update: %+v", obj) + p.emitRunning(pod) +} + +// OnDelete stops pod objects that are deleted +func (p *eventWatcher) OnDelete(obj interface{}) { + p.logger.Debugf("pod delete: %+v", obj) + pod := obj.(*kubernetes.Pod) + time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) +}