From 4f8b0750fb33ab53a70165083a5a5cae56176d10 Mon Sep 17 00:00:00 2001 From: Andreas Thaler Date: Thu, 16 Nov 2023 18:28:57 +0100 Subject: [PATCH] [receiver/receivercreator] Add support for k8s service discovery (#29022) **Description:** - Added a new watch to the k8s_observer extension for k8s services, which can be enabled using a new flag "observe_services". - Discovered entities are transformed into a new endpoint type `k8s.service`. - Adjusted the receivercreator to support the new type `k8s.service` **Link to tracking Issue:** [#29021](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29021) **Testing:** Added unit tests analogue to the available tests **Documentation:** Adjusted readme's of k8s_observer and receivercreator. Added description of new flags and typers. **Note:** Current implementation is working as described in the linked ticket. Please check the potential discussion points mentioned in the ticket: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29021#issuecomment-1801290614 --------- Co-authored-by: Antoine Toulme --- ...creator-support-k8s-service-discovery.yaml | 27 ++++++ extension/observer/endpoints.go | 37 ++++++++ extension/observer/endpoints_test.go | 38 +++++++- extension/observer/k8sobserver/README.md | 4 +- extension/observer/k8sobserver/config.go | 6 +- extension/observer/k8sobserver/config_test.go | 11 +-- extension/observer/k8sobserver/extension.go | 47 ++++++---- .../observer/k8sobserver/extension_test.go | 88 +++++++++++++++++++ extension/observer/k8sobserver/handler.go | 21 +++++ .../observer/k8sobserver/handler_test.go | 61 +++++++++++++ .../observer/k8sobserver/k8s_fixtures_test.go | 30 +++++++ .../observer/k8sobserver/service_endpoint.go | 40 +++++++++ .../k8sobserver/service_endpoint_test.go | 31 +++++++ .../observer/k8sobserver/testdata/config.yaml | 2 + receiver/receivercreator/README.md | 31 ++++++- receiver/receivercreator/config.go | 2 +- receiver/receivercreator/config_test.go | 11 +-- receiver/receivercreator/factory.go | 3 + receiver/receivercreator/fixtures_test.go | 19 ++++ receiver/receivercreator/rules.go | 2 +- receiver/receivercreator/rules_test.go | 1 + receiver/receivercreator/testdata/config.yaml | 2 + 22 files changed, 482 insertions(+), 32 deletions(-) create mode 100644 .chloggen/receivercreator-support-k8s-service-discovery.yaml create mode 100644 extension/observer/k8sobserver/service_endpoint.go create mode 100644 extension/observer/k8sobserver/service_endpoint_test.go diff --git a/.chloggen/receivercreator-support-k8s-service-discovery.yaml b/.chloggen/receivercreator-support-k8s-service-discovery.yaml new file mode 100644 index 000000000000..d6b1a4fd25f7 --- /dev/null +++ b/.chloggen/receivercreator-support-k8s-service-discovery.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added support for discovery of endpoints based on K8s services + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29022] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: By discovering endpoints based on K8s services, a dynamic probing of K8s service leveraging for example the httpcheckreceiver get enabled + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/observer/endpoints.go b/extension/observer/endpoints.go index 7754841bc758..89a57b255a89 100644 --- a/extension/observer/endpoints.go +++ b/extension/observer/endpoints.go @@ -23,6 +23,8 @@ const ( PortType EndpointType = "port" // PodType is a pod endpoint. PodType EndpointType = "pod" + // K8sServiceType is a service endpoint. + K8sServiceType EndpointType = "k8s.service" // K8sNodeType is a Kubernetes Node endpoint. K8sNodeType EndpointType = "k8s.node" // HostPortType is a hostport endpoint. @@ -34,6 +36,7 @@ const ( var ( _ EndpointDetails = (*Pod)(nil) _ EndpointDetails = (*Port)(nil) + _ EndpointDetails = (*K8sService)(nil) _ EndpointDetails = (*K8sNode)(nil) _ EndpointDetails = (*HostPort)(nil) _ EndpointDetails = (*Container)(nil) @@ -92,6 +95,40 @@ func (e Endpoint) equals(other Endpoint) bool { } } +// K8sService is a discovered k8s service. +type K8sService struct { + // Name of the service. + Name string + // UID is the unique ID in the cluster for the service. + UID string + // Labels is a map of user-specified metadata. + Labels map[string]string + // Annotations is a map of user-specified metadata. + Annotations map[string]string + // Namespace must be unique for services with same name. + Namespace string + // ClusterIP is the IP under which the service is reachable within the cluster. + ClusterIP string + // ServiceType is the type of the service: ClusterIP, NodePort, LoadBalancer, ExternalName + ServiceType string +} + +func (s *K8sService) Env() EndpointEnv { + return map[string]any{ + "uid": s.UID, + "name": s.Name, + "labels": s.Labels, + "annotations": s.Annotations, + "namespace": s.Namespace, + "cluster_ip": s.ClusterIP, + "service_type": s.ServiceType, + } +} + +func (s *K8sService) Type() EndpointType { + return K8sServiceType +} + // Pod is a discovered k8s pod. type Pod struct { // Name of the pod. diff --git a/extension/observer/endpoints_test.go b/extension/observer/endpoints_test.go index b81e9440049b..c90be0900d34 100644 --- a/extension/observer/endpoints_test.go +++ b/extension/observer/endpoints_test.go @@ -49,7 +49,7 @@ func TestEndpointEnv(t *testing.T) { }, }, { - name: "K8s port", + name: "K8s pod port", endpoint: Endpoint{ ID: EndpointID("port_id"), Target: "192.68.73.2", @@ -90,6 +90,42 @@ func TestEndpointEnv(t *testing.T) { "transport": ProtocolTCP, }, }, + { + name: "Service", + endpoint: Endpoint{ + ID: EndpointID("service_id"), + Target: "service.namespace", + Details: &K8sService{ + Name: "service_name", + UID: "service-uid", + Labels: map[string]string{ + "label_key": "label_val", + }, + Annotations: map[string]string{ + "annotation_1": "value_1", + }, + Namespace: "service-namespace", + ServiceType: "LoadBalancer", + ClusterIP: "192.68.73.2", + }, + }, + want: EndpointEnv{ + "type": "k8s.service", + "endpoint": "service.namespace", + "id": "service_id", + "name": "service_name", + "labels": map[string]string{ + "label_key": "label_val", + }, + "annotations": map[string]string{ + "annotation_1": "value_1", + }, + "uid": "service-uid", + "namespace": "service-namespace", + "cluster_ip": "192.68.73.2", + "service_type": "LoadBalancer", + }, + }, { name: "Host port", endpoint: Endpoint{ diff --git a/extension/observer/k8sobserver/README.md b/extension/observer/k8sobserver/README.md index 3f31e393ed18..4945ff0460ef 100644 --- a/extension/observer/k8sobserver/README.md +++ b/extension/observer/k8sobserver/README.md @@ -15,7 +15,7 @@ The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report -Kubernetes pod, port, and node endpoints via the Kubernetes API. +Kubernetes pod, port, service and node endpoints via the Kubernetes API. ## Example Config @@ -26,6 +26,7 @@ extensions: node: ${env:K8S_NODE_NAME} observe_pods: true observe_nodes: true + observe_services: true receivers: receiver_creator: @@ -71,3 +72,4 @@ All fields are optional. | node | string | | The node name to limit the discovery of pod, port, and node endpoints. Providing no value (the default) results in discovering endpoints for all available nodes. | | observe_pods | bool | `true` | Whether to report observer pod and port endpoints. If `true` and `node` is specified it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and `node` isn't specified, it will discover all available pod and port endpoints. Please note that Collector connectivity to pods from other nodes is dependent on your cluster configuration and isn't guaranteed. | | observe_nodes | bool | `false` | Whether to report observer k8s.node endpoints. If `true` and `node` is specified it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and `node` isn't specified, it will discover all available node endpoints. Please note that Collector connectivity to nodes is dependent on your cluster configuration and isn't guaranteed.| +| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.| diff --git a/extension/observer/k8sobserver/config.go b/extension/observer/k8sobserver/config.go index 8dbfa38efaea..9f202be4c246 100644 --- a/extension/observer/k8sobserver/config.go +++ b/extension/observer/k8sobserver/config.go @@ -34,12 +34,14 @@ type Config struct { // it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and // Node isn't specified, it will discover all available node endpoints. `false` by default. ObserveNodes bool `mapstructure:"observe_nodes"` + // ObserveServices determines whether to report observer service and port endpoints. `false` by default. + ObserveServices bool `mapstructure:"observe_services"` } // Validate checks if the extension configuration is valid func (cfg *Config) Validate() error { - if !cfg.ObservePods && !cfg.ObserveNodes { - return fmt.Errorf("one of observe_pods and observe_nodes must be true") + if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices { + return fmt.Errorf("one of observe_pods, observe_nodes and observe_services must be true") } return nil } diff --git a/extension/observer/k8sobserver/config_test.go b/extension/observer/k8sobserver/config_test.go index 28db85945589..20f29407a4da 100644 --- a/extension/observer/k8sobserver/config_test.go +++ b/extension/observer/k8sobserver/config_test.go @@ -39,10 +39,11 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, "observe-all"), expected: &Config{ - Node: "", - APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone}, - ObservePods: true, - ObserveNodes: true, + Node: "", + APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone}, + ObservePods: true, + ObserveNodes: true, + ObserveServices: true, }, }, { @@ -51,7 +52,7 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(metadata.Type, "invalid_no_observing"), - expectedErr: "one of observe_pods and observe_nodes must be true", + expectedErr: "one of observe_pods, observe_nodes and observe_services must be true", }, } for _, tt := range tests { diff --git a/extension/observer/k8sobserver/extension.go b/extension/observer/k8sobserver/extension.go index 4ee3579e920e..80f4a4ad3efd 100644 --- a/extension/observer/k8sobserver/extension.go +++ b/extension/observer/k8sobserver/extension.go @@ -25,13 +25,14 @@ var _ observer.Observable = (*k8sObserver)(nil) type k8sObserver struct { *observer.EndpointsWatcher - telemetry component.TelemetrySettings - podListerWatcher cache.ListerWatcher - nodeListerWatcher cache.ListerWatcher - handler *handler - once *sync.Once - stop chan struct{} - config *Config + telemetry component.TelemetrySettings + podListerWatcher cache.ListerWatcher + serviceListerWatcher cache.ListerWatcher + nodeListerWatcher cache.ListerWatcher + handler *handler + once *sync.Once + stop chan struct{} + config *Config } // Start will populate the cache.SharedInformers for pods and nodes as configured and run them as goroutines. @@ -52,6 +53,14 @@ func (k *k8sObserver) Start(_ context.Context, _ component.Host) error { } go podInformer.Run(k.stop) } + if k.serviceListerWatcher != nil { + k.telemetry.Logger.Debug("creating and starting service informer") + serviceInformer := cache.NewSharedInformer(k.serviceListerWatcher, &v1.Service{}, 0) + if _, err := serviceInformer.AddEventHandler(k.handler); err != nil { + k.telemetry.Logger.Error("error adding event handler to service informer", zap.Error(err)) + } + go serviceInformer.Run(k.stop) + } if k.nodeListerWatcher != nil { k.telemetry.Logger.Debug("creating and starting node informer") nodeInformer := cache.NewSharedInformer(k.nodeListerWatcher, &v1.Node{}, 0) @@ -90,6 +99,13 @@ func newObserver(config *Config, set extension.CreateSettings) (extension.Extens podListerWatcher = cache.NewListWatchFromClient(restClient, "pods", v1.NamespaceAll, podSelector) } + var serviceListerWatcher cache.ListerWatcher + if config.ObserveServices { + var serviceSelector = fields.Everything() + set.Logger.Debug("observing services") + serviceListerWatcher = cache.NewListWatchFromClient(restClient, "services", v1.NamespaceAll, serviceSelector) + } + var nodeListerWatcher cache.ListerWatcher if config.ObserveNodes { var nodeSelector fields.Selector @@ -103,14 +119,15 @@ func newObserver(config *Config, set extension.CreateSettings) (extension.Extens } h := &handler{idNamespace: set.ID.String(), endpoints: &sync.Map{}, logger: set.TelemetrySettings.Logger} obs := &k8sObserver{ - EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger), - telemetry: set.TelemetrySettings, - podListerWatcher: podListerWatcher, - nodeListerWatcher: nodeListerWatcher, - stop: make(chan struct{}), - config: config, - handler: h, - once: &sync.Once{}, + EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger), + telemetry: set.TelemetrySettings, + podListerWatcher: podListerWatcher, + serviceListerWatcher: serviceListerWatcher, + nodeListerWatcher: nodeListerWatcher, + stop: make(chan struct{}), + config: config, + handler: h, + once: &sync.Once{}, } return obs, nil diff --git a/extension/observer/k8sobserver/extension_test.go b/extension/observer/k8sobserver/extension_test.go index 726ec58382e6..028849a23ea6 100644 --- a/extension/observer/k8sobserver/extension_test.go +++ b/extension/observer/k8sobserver/extension_test.go @@ -40,6 +40,94 @@ func TestNewExtension(t *testing.T) { require.NotNil(t, ext) } +func TestExtensionObserveServices(t *testing.T) { + factory := NewFactory() + config := factory.CreateDefaultConfig().(*Config) + mockServiceHost(t, config) + + set := extensiontest.NewNopCreateSettings() + set.ID = component.NewID(metadata.Type) + ext, err := newObserver(config, set) + require.NoError(t, err) + require.NotNil(t, ext) + + obs := ext.(*k8sObserver) + serviceListerWatcher := framework.NewFakeControllerSource() + obs.serviceListerWatcher = serviceListerWatcher + + serviceListerWatcher.Add(serviceWithClusterIP) + + require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) + + sink := &endpointSink{} + obs.ListAndWatch(sink) + + requireSink(t, sink, func() bool { + return len(sink.added) == 1 + }) + + assert.Equal(t, observer.Endpoint{ + ID: "k8s_observer/service-1-UID", + Target: "service-1.default.svc.cluster.local", + Details: &observer.K8sService{ + Name: "service-1", + Namespace: "default", + UID: "service-1-UID", + Labels: map[string]string{ + "env": "prod", + }, + ClusterIP: "1.2.3.4", + ServiceType: "ClusterIP", + }, + }, sink.added[0]) + + serviceListerWatcher.Modify(serviceWithClusterIPV2) + + requireSink(t, sink, func() bool { + return len(sink.changed) == 1 + }) + + assert.Equal(t, observer.Endpoint{ + ID: "k8s_observer/service-1-UID", + Target: "service-1.default.svc.cluster.local", + Details: &observer.K8sService{ + Name: "service-1", + Namespace: "default", + UID: "service-1-UID", + Labels: map[string]string{ + "env": "prod", + "service-version": "2", + }, + ClusterIP: "1.2.3.4", + ServiceType: "ClusterIP", + }, + }, sink.changed[0]) + + serviceListerWatcher.Delete(serviceWithClusterIPV2) + + requireSink(t, sink, func() bool { + return len(sink.removed) == 1 + }) + + assert.Equal(t, observer.Endpoint{ + ID: "k8s_observer/service-1-UID", + Target: "service-1.default.svc.cluster.local", + Details: &observer.K8sService{ + Name: "service-1", + Namespace: "default", + UID: "service-1-UID", + Labels: map[string]string{ + "env": "prod", + "service-version": "2", + }, + ClusterIP: "1.2.3.4", + ServiceType: "ClusterIP", + }, + }, sink.removed[0]) + + require.NoError(t, ext.Shutdown(context.Background())) +} + func TestExtensionObservePods(t *testing.T) { factory := NewFactory() config := factory.CreateDefaultConfig().(*Config) diff --git a/extension/observer/k8sobserver/handler.go b/extension/observer/k8sobserver/handler.go index d39dabbafe26..859738a5c9c0 100644 --- a/extension/observer/k8sobserver/handler.go +++ b/extension/observer/k8sobserver/handler.go @@ -47,6 +47,8 @@ func (h *handler) OnAdd(objectInterface any, _ bool) { switch object := objectInterface.(type) { case *v1.Pod: endpoints = convertPodToEndpoints(h.idNamespace, object) + case *v1.Service: + endpoints = convertServiceToEndpoints(h.idNamespace, object) case *v1.Node: endpoints = append(endpoints, convertNodeToEndpoint(h.idNamespace, object)) default: // unsupported @@ -67,6 +69,7 @@ func (h *handler) OnUpdate(oldObjectInterface, newObjectInterface any) { case *v1.Pod: newPod, ok := newObjectInterface.(*v1.Pod) if !ok { + h.logger.Warn("skip updating endpoint for pod as the update is of different type", zap.Any("oldPod", oldObjectInterface), zap.Any("newObject", newObjectInterface)) return } for _, e := range convertPodToEndpoints(h.idNamespace, oldObject) { @@ -76,9 +79,23 @@ func (h *handler) OnUpdate(oldObjectInterface, newObjectInterface any) { newEndpoints[e.ID] = e } + case *v1.Service: + newService, ok := newObjectInterface.(*v1.Service) + if !ok { + h.logger.Warn("skip updating endpoint for service as the update is of different type", zap.Any("oldService", oldObjectInterface), zap.Any("newObject", newObjectInterface)) + return + } + for _, e := range convertServiceToEndpoints(h.idNamespace, oldObject) { + oldEndpoints[e.ID] = e + } + for _, e := range convertServiceToEndpoints(h.idNamespace, newService) { + newEndpoints[e.ID] = e + } + case *v1.Node: newNode, ok := newObjectInterface.(*v1.Node) if !ok { + h.logger.Warn("skip updating endpoint for node as the update is of different type", zap.Any("oldNode", oldObjectInterface), zap.Any("newObject", newObjectInterface)) return } oldEndpoint := convertNodeToEndpoint(h.idNamespace, oldObject) @@ -144,6 +161,10 @@ func (h *handler) OnDelete(objectInterface any) { if object != nil { endpoints = convertPodToEndpoints(h.idNamespace, object) } + case *v1.Service: + if object != nil { + endpoints = convertServiceToEndpoints(h.idNamespace, object) + } case *v1.Node: if object != nil { endpoints = append(endpoints, convertNodeToEndpoint(h.idNamespace, object)) diff --git a/extension/observer/k8sobserver/handler_test.go b/extension/observer/k8sobserver/handler_test.go index 793800907567..402b5fcc4213 100644 --- a/extension/observer/k8sobserver/handler_test.go +++ b/extension/observer/k8sobserver/handler_test.go @@ -104,6 +104,67 @@ func TestPodEndpointsChanged(t *testing.T) { }, th.ListEndpoints()) } +func TestServiceEndpointsAdded(t *testing.T) { + th := newTestHandler() + th.OnAdd(serviceWithClusterIP, true) + assert.ElementsMatch(t, []observer.Endpoint{ + { + ID: "test-1/service-1-UID", + Target: "service-1.default.svc.cluster.local", + Details: &observer.K8sService{ + Name: "service-1", + Namespace: "default", + UID: "service-1-UID", + Labels: map[string]string{"env": "prod"}, + ServiceType: "ClusterIP", + ClusterIP: "1.2.3.4", + }, + }}, th.ListEndpoints()) +} + +func TestServiceEndpointsRemoved(t *testing.T) { + th := newTestHandler() + th.OnAdd(serviceWithClusterIP, true) + th.OnDelete(serviceWithClusterIP) + assert.Empty(t, th.ListEndpoints()) +} + +func TestServiceEndpointsChanged(t *testing.T) { + th := newTestHandler() + // Nothing changed. + th.OnUpdate(serviceWithClusterIP, serviceWithClusterIP) + require.Empty(t, th.ListEndpoints()) + + // Labels changed. + changedLabels := serviceWithClusterIP.DeepCopy() + changedLabels.Labels["new-label"] = "value" + th.OnUpdate(serviceWithClusterIP, changedLabels) + + endpoints := th.ListEndpoints() + require.ElementsMatch(t, + []observer.EndpointID{"test-1/service-1-UID"}, + []observer.EndpointID{endpoints[0].ID}, + ) + + // Running state changed, one added and one removed. + updatedService := serviceWithClusterIP.DeepCopy() + updatedService.Labels["updated-label"] = "true" + th.OnUpdate(serviceWithClusterIP, updatedService) + require.ElementsMatch(t, []observer.Endpoint{ + { + ID: "test-1/service-1-UID", + Target: "service-1.default.svc.cluster.local", + Details: &observer.K8sService{ + Name: "service-1", + Namespace: "default", + UID: "service-1-UID", + Labels: map[string]string{"env": "prod", "updated-label": "true"}, + ServiceType: "ClusterIP", + ClusterIP: "1.2.3.4", + }}, + }, th.ListEndpoints()) +} + func TestNodeEndpointsAdded(t *testing.T) { th := newTestHandler() th.OnAdd(node1V1, true) diff --git a/extension/observer/k8sobserver/k8s_fixtures_test.go b/extension/observer/k8sobserver/k8s_fixtures_test.go index 10153c4d6a52..4f0acd4aa273 100644 --- a/extension/observer/k8sobserver/k8s_fixtures_test.go +++ b/extension/observer/k8sobserver/k8s_fixtures_test.go @@ -104,6 +104,36 @@ func pointerBool(val bool) *bool { return &val } +// newService is a helper function for creating Services for testing. +func newService(name string) *v1.Service { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: name, + UID: types.UID(name + "-UID"), + Labels: map[string]string{ + "env": "prod", + }, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + ClusterIP: "1.2.3.4", + }, + } + + return service +} + +var serviceWithClusterIP = func() *v1.Service { + return newService("service-1") +}() + +var serviceWithClusterIPV2 = func() *v1.Service { + service := serviceWithClusterIP.DeepCopy() + service.Labels["service-version"] = "2" + return service +}() + // newNode is a helper function for creating Nodes for testing. func newNode(name, hostname string) *v1.Node { return &v1.Node{ diff --git a/extension/observer/k8sobserver/service_endpoint.go b/extension/observer/k8sobserver/service_endpoint.go new file mode 100644 index 000000000000..ee8957cd3ef4 --- /dev/null +++ b/extension/observer/k8sobserver/service_endpoint.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver" + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +// convertServiceToEndpoints converts a service instance into a slice of endpoints. The endpoints +// include the service itself only. +func convertServiceToEndpoints(idNamespace string, service *v1.Service) []observer.Endpoint { + serviceID := observer.EndpointID(fmt.Sprintf("%s/%s", idNamespace, service.UID)) + + serviceDetails := observer.K8sService{ + UID: string(service.UID), + Annotations: service.Annotations, + Labels: service.Labels, + Name: service.Name, + Namespace: service.Namespace, + ClusterIP: service.Spec.ClusterIP, + ServiceType: string(service.Spec.Type), + } + + endpoints := []observer.Endpoint{{ + ID: serviceID, + Target: generateServiceTarget(&serviceDetails), + Details: &serviceDetails, + }} + + return endpoints +} + +func generateServiceTarget(service *observer.K8sService) string { + return fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace) +} diff --git a/extension/observer/k8sobserver/service_endpoint_test.go b/extension/observer/k8sobserver/service_endpoint_test.go new file mode 100644 index 000000000000..391fd448b633 --- /dev/null +++ b/extension/observer/k8sobserver/service_endpoint_test.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver" + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +func TestServiceObjectToEndpoint(t *testing.T) { + expectedEndpoints := []observer.Endpoint{ + { + ID: "namespace/service-1-UID", + Target: "service-1.default.svc.cluster.local", + Details: &observer.K8sService{ + Name: "service-1", + Namespace: "default", + UID: "service-1-UID", + Labels: map[string]string{"env": "prod"}, + ServiceType: "ClusterIP", + ClusterIP: "1.2.3.4", + }}, + } + + endpoints := convertServiceToEndpoints("namespace", serviceWithClusterIP) + require.Equal(t, expectedEndpoints, endpoints) +} diff --git a/extension/observer/k8sobserver/testdata/config.yaml b/extension/observer/k8sobserver/testdata/config.yaml index b1f1205cce8c..8e6163829025 100644 --- a/extension/observer/k8sobserver/testdata/config.yaml +++ b/extension/observer/k8sobserver/testdata/config.yaml @@ -6,8 +6,10 @@ k8s_observer/observe-all: auth_type: none observe_nodes: true observe_pods: true + observe_services: true k8s_observer/invalid_auth: auth_type: not a real auth type k8s_observer/invalid_no_observing: observe_nodes: false observe_pods: false + observe_services: false diff --git a/receiver/receivercreator/README.md b/receiver/receivercreator/README.md index 2ffa14f7723e..c8e59cfee632 100644 --- a/receiver/receivercreator/README.md +++ b/receiver/receivercreator/README.md @@ -122,6 +122,12 @@ Note that the backticks below are not typos--they indicate the value is set dyna None +`type == "k8s.service"` + +| Resource Attribute | Default | +|--------------------|-------------------| +| k8s.namespace.name | \`namespace\` | + `type == "k8s.node"` | Resource Attribute | Default | @@ -145,7 +151,7 @@ Similar to the per-endpoint type `resource_attributes` described above but for i ## Rule Expressions -Each rule must start with `type == ("pod"|"port"|"hostport"|"container"|"k8s.node") &&` such that the rule matches +Each rule must start with `type == ("pod"|"port"|"hostport"|"container"|"k8s.service"|"k8s.node") &&` such that the rule matches only one endpoint type. Depending on the type of endpoint the rule is targeting it will have different variables available. @@ -204,6 +210,20 @@ targeting it will have different variables available. | transport | Transport protocol used by the endpoint (TCP or UDP) | | labels | User-specified metadata labels on the container | +### Kubernetes Service + +| Variable | Description | +|----------------|-------------------------------------------------------------------| +| type | `"k8s.service"` | +| id | ID of source endpoint | +| name | The name of the Kubernetes service | +| namespace | The namespace of the service | +| uid | The unique ID for the service | +| labels | The map of labels set on the service | +| annotations | The map of annotations set on the service | +| service_type | The type of the kubernetes service: ClusterIP, NodePort, LoadBalancer, ExternalName | +| cluster_ip | The cluster IP assigned to the service | + ### Kubernetes Node | Variable | Description | @@ -290,6 +310,15 @@ receivers: - container - pod - node + receivers: + httpcheck: + # Configure probing if standard prometheus annotations are set on the pod. + rule: type == "k8s.service" && annotations["prometheus.io/probe"] == "true" + config: + targets: + - endpoint: 'http://`endpoint`:`"prometheus.io/port" in annotations ? annotations["prometheus.io/port"] : 9090``"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/health"`' + method: GET + collection_interval: 10s processors: exampleprocessor: diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index 68deedf82abe..771f7085efa7 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -92,7 +92,7 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { for endpointType := range cfg.ResourceAttributes { switch endpointType { - case observer.ContainerType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType: + case observer.ContainerType, observer.K8sServiceType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType: default: return fmt.Errorf("resource attributes for unsupported endpoint type %q", endpointType) } diff --git a/receiver/receivercreator/config_test.go b/receiver/receivercreator/config_test.go index 2062a93c8c6f..64abb7db65ef 100644 --- a/receiver/receivercreator/config_test.go +++ b/receiver/receivercreator/config_test.go @@ -109,11 +109,12 @@ func TestLoadConfig(t *testing.T) { component.NewIDWithName("mock_observer", "with_name"), }, ResourceAttributes: map[observer.EndpointType]map[string]string{ - observer.ContainerType: {"container.key": "container.value"}, - observer.PodType: {"pod.key": "pod.value"}, - observer.PortType: {"port.key": "port.value"}, - observer.HostPortType: {"hostport.key": "hostport.value"}, - observer.K8sNodeType: {"k8s.node.key": "k8s.node.value"}, + observer.ContainerType: {"container.key": "container.value"}, + observer.PodType: {"pod.key": "pod.value"}, + observer.PortType: {"port.key": "port.value"}, + observer.HostPortType: {"hostport.key": "hostport.value"}, + observer.K8sServiceType: {"k8s.service.key": "k8s.service.value"}, + observer.K8sNodeType: {"k8s.node.key": "k8s.node.value"}, }, }, }, diff --git a/receiver/receivercreator/factory.go b/receiver/receivercreator/factory.go index a8d52d79c277..26c35128a8e7 100644 --- a/receiver/receivercreator/factory.go +++ b/receiver/receivercreator/factory.go @@ -39,6 +39,9 @@ func createDefaultConfig() component.Config { conventions.AttributeK8SPodUID: "`uid`", conventions.AttributeK8SNamespaceName: "`namespace`", }, + observer.K8sServiceType: map[string]string{ + conventions.AttributeK8SNamespaceName: "`namespace`", + }, observer.PortType: map[string]string{ conventions.AttributeK8SPodName: "`pod.name`", conventions.AttributeK8SPodUID: "`pod.uid`", diff --git a/receiver/receivercreator/fixtures_test.go b/receiver/receivercreator/fixtures_test.go index 9931965ddf45..069604d70344 100644 --- a/receiver/receivercreator/fixtures_test.go +++ b/receiver/receivercreator/fixtures_test.go @@ -26,6 +26,25 @@ var podEndpoint = observer.Endpoint{ Details: &pod, } +var service = observer.K8sService{ + UID: "uid-1", + Namespace: "default", + Name: "service-1", + Labels: map[string]string{ + "app": "redis2", + "region": "west-1", + }, + Annotations: map[string]string{ + "scrape": "true", + }, +} + +var serviceEndpoint = observer.Endpoint{ + ID: "service-1", + Target: "localhost", + Details: &service, +} + var portEndpoint = observer.Endpoint{ ID: "port-1", Target: "localhost:1234", diff --git a/receiver/receivercreator/rules.go b/receiver/receivercreator/rules.go index 1ff394a4e6bf..8262adeec7c1 100644 --- a/receiver/receivercreator/rules.go +++ b/receiver/receivercreator/rules.go @@ -22,7 +22,7 @@ type rule struct { // ruleRe is used to verify the rule starts type check. var ruleRe = regexp.MustCompile( - fmt.Sprintf(`^type\s*==\s*(%q|%q|%q|%q|%q)`, observer.PodType, observer.PortType, observer.HostPortType, observer.ContainerType, observer.K8sNodeType), + fmt.Sprintf(`^type\s*==\s*(%q|%q|%q|%q|%q|%q)`, observer.PodType, observer.K8sServiceType, observer.PortType, observer.HostPortType, observer.ContainerType, observer.K8sNodeType), ) // newRule creates a new rule instance. diff --git a/receiver/receivercreator/rules_test.go b/receiver/receivercreator/rules_test.go index b2e0f65ffb0e..4ecd0c8f85f8 100644 --- a/receiver/receivercreator/rules_test.go +++ b/receiver/receivercreator/rules_test.go @@ -28,6 +28,7 @@ func Test_ruleEval(t *testing.T) { {"basic port", args{`type == "port" && name == "http" && pod.labels["app"] == "redis"`, portEndpoint}, true, false}, {"basic hostport", args{`type == "hostport" && port == 1234 && process_name == "splunk"`, hostportEndpoint}, true, false}, {"basic pod", args{`type == "pod" && labels["region"] == "west-1"`, podEndpoint}, true, false}, + {"basic service", args{`type == "k8s.service" && labels["region"] == "west-1"`, serviceEndpoint}, true, false}, {"annotations", args{`type == "pod" && annotations["scrape"] == "true"`, podEndpoint}, true, false}, {"basic container", args{`type == "container" && labels["region"] == "east-1"`, containerEndpoint}, true, false}, {"basic k8s.node", args{`type == "k8s.node" && kubelet_endpoint_port == 10250`, k8sNodeEndpoint}, true, false}, diff --git a/receiver/receivercreator/testdata/config.yaml b/receiver/receivercreator/testdata/config.yaml index 0b5b42b9516d..f0632e39de08 100644 --- a/receiver/receivercreator/testdata/config.yaml +++ b/receiver/receivercreator/testdata/config.yaml @@ -25,5 +25,7 @@ receiver_creator/1: port.key: port.value hostport: hostport.key: hostport.value + k8s.service: + k8s.service.key: k8s.service.value k8s.node: k8s.node.key: k8s.node.value