diff --git a/CHANGELOG.md b/CHANGELOG.md index 3599155056..9b7c7dd852 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ IMPROVEMENTS: * Add the `balanceInboundConnections` field to the `ServiceDefaults` CRD. [[GH-1823]](https://github.com/hashicorp/consul-k8s/pull/1823) * Control-Plane * Add support for the annotation `consul.hashicorp.com/use-proxy-health-check`. [[GH-1824](https://github.com/hashicorp/consul-k8s/pull/1824)] + * Add health check for synced services based on the status of the Kubernetes readiness probe on synced pod. [[GH-1821](https://github.com/hashicorp/consul-k8s/pull/1821)] BUG FIXES: * Control Plane diff --git a/acceptance/tests/sync/sync_catalog_test.go b/acceptance/tests/sync/sync_catalog_test.go index 92b006cac6..c4f873fcbd 100644 --- a/acceptance/tests/sync/sync_catalog_test.go +++ b/acceptance/tests/sync/sync_catalog_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul-k8s/acceptance/framework/helpers" "github.com/hashicorp/consul-k8s/acceptance/framework/k8s" "github.com/hashicorp/consul-k8s/acceptance/framework/logger" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require" ) @@ -65,8 +66,13 @@ func TestSyncCatalog(t *testing.T) { service, _, err := consulClient.Catalog().Service(syncedServiceName, "", nil) require.NoError(t, err) - require.Equal(t, 1, len(service)) + require.Len(t, service, 1) require.Equal(t, []string{"k8s"}, service[0].ServiceTags) + filter := fmt.Sprintf("ServiceID == %q", service[0].ServiceID) + healthChecks, _, err := consulClient.Health().Checks(syncedServiceName, &api.QueryOptions{Filter: filter}) + require.NoError(t, err) + require.Len(t, healthChecks, 1) + require.Equal(t, api.HealthPassing, healthChecks[0].Status) }) } } diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 96538510d1..09d8aa6c5d 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -13,7 +13,7 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/namespaces" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" - apiv1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -33,6 +33,12 @@ const ( ConsulK8SRefKind = "external-k8s-ref-kind" ConsulK8SRefValue = "external-k8s-ref-name" ConsulK8SNodeName = "external-k8s-node-name" + + // consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status. + consulKubernetesCheckType = "kubernetes-readiness" + // consulKubernetesCheckName is the name of health check in Consul for Kubernetes readiness status. + consulKubernetesCheckName = "Kubernetes Readiness Check" + kubernetesSuccessReasonMsg = "Kubernetes health checks passing" ) type NodePortSyncType string @@ -131,11 +137,11 @@ type ServiceResource struct { // serviceMap holds services we should sync to Consul. Keys are the // in the form /. - serviceMap map[string]*apiv1.Service + serviceMap map[string]*corev1.Service // endpointsMap uses the same keys as serviceMap but maps to the endpoints // of each service. - endpointsMap map[string]*apiv1.Endpoints + endpointsMap map[string]*corev1.Endpoints // consulMap holds the services in Consul that we've registered from kube. // It's populated via Consul's API and lets us diff what is actually in @@ -157,7 +163,7 @@ func (t *ServiceResource) Informer() cache.SharedIndexInformer { return t.Client.CoreV1().Services(metav1.NamespaceAll).Watch(t.Ctx, options) }, }, - &apiv1.Service{}, + &corev1.Service{}, 0, cache.Indexers{}, ) @@ -166,7 +172,7 @@ func (t *ServiceResource) Informer() cache.SharedIndexInformer { // Upsert implements the controller.Resource interface. func (t *ServiceResource) Upsert(key string, raw interface{}) error { // We expect a Service. If it isn't a service then just ignore it. - service, ok := raw.(*apiv1.Service) + service, ok := raw.(*corev1.Service) if !ok { t.Log.Warn("upsert got invalid type", "raw", raw) return nil @@ -176,7 +182,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { defer t.serviceLock.Unlock() if t.serviceMap == nil { - t.serviceMap = make(map[string]*apiv1.Service) + t.serviceMap = make(map[string]*corev1.Service) } if !t.shouldSync(service) { @@ -205,7 +211,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { "err", err) } else { if t.endpointsMap == nil { - t.endpointsMap = make(map[string]*apiv1.Endpoints) + t.endpointsMap = make(map[string]*corev1.Endpoints) } t.endpointsMap[key] = endpoints t.Log.Debug("[ServiceResource.Upsert] adding service's endpoints to endpointsMap", "key", key, "service", service, "endpoints", endpoints) @@ -254,7 +260,7 @@ func (t *ServiceResource) Run(ch <-chan struct{}) { } // shouldSync returns true if resyncing should be enabled for the given service. -func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool { +func (t *ServiceResource) shouldSync(svc *corev1.Service) bool { // Namespace logic // If in deny list, don't sync if t.DenyK8sNamespacesSet.Contains(svc.Namespace) { @@ -269,7 +275,7 @@ func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool { } // Ignore ClusterIP services if ClusterIP sync is disabled - if svc.Spec.Type == apiv1.ServiceTypeClusterIP && !t.ClusterIPSync { + if svc.Spec.Type == corev1.ServiceTypeClusterIP && !t.ClusterIPSync { t.Log.Debug("[shouldSync] ignoring clusterip service", "svc.Namespace", svc.Namespace, "service", svc) return false } @@ -310,9 +316,9 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool { return false } - return svc.Spec.Type == apiv1.ServiceTypeNodePort || - svc.Spec.Type == apiv1.ServiceTypeClusterIP || - (t.LoadBalancerEndpointsSync && svc.Spec.Type == apiv1.ServiceTypeLoadBalancer) + return svc.Spec.Type == corev1.ServiceTypeNodePort || + svc.Spec.Type == corev1.ServiceTypeClusterIP || + (t.LoadBalancerEndpointsSync && svc.Spec.Type == corev1.ServiceTypeLoadBalancer) } // generateRegistrations generates the necessary Consul registrations for @@ -380,7 +386,7 @@ func (t *ServiceResource) generateRegistrations(key string) { var overridePortNumber int if len(svc.Spec.Ports) > 0 { var port int - isNodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort + isNodePort := svc.Spec.Type == corev1.ServiceTypeNodePort // If a specific port is specified, then use that port value portAnnotation, ok := svc.Annotations[annotationServicePort] @@ -479,7 +485,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // each LoadBalancer entry. We only support entries that have an IP // address assigned (not hostnames). // If LoadBalancerEndpointsSync is true sync LB endpoints instead of loadbalancer ingress. - case apiv1.ServiceTypeLoadBalancer: + case corev1.ServiceTypeLoadBalancer: if t.LoadBalancerEndpointsSync { t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, false) } else { @@ -512,7 +518,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // endpoint of the service, which corresponds to the nodes the service's // pods are running on. This way we don't register _every_ K8S // node as part of the service. - case apiv1.ServiceTypeNodePort: + case corev1.ServiceTypeNodePort: if t.endpointsMap == nil { return } @@ -538,11 +544,11 @@ func (t *ServiceResource) generateRegistrations(key string) { } // Set the expected node address type - var expectedType apiv1.NodeAddressType + var expectedType corev1.NodeAddressType if t.NodePortSync == InternalOnly { - expectedType = apiv1.NodeInternalIP + expectedType = corev1.NodeInternalIP } else { - expectedType = apiv1.NodeExternalIP + expectedType = corev1.NodeExternalIP } // Find the ip address for the node and @@ -571,7 +577,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // use an InternalIP if t.NodePortSync == ExternalFirst && !found { for _, address := range node.Status.Addresses { - if address.Type == apiv1.NodeInternalIP { + if address.Type == corev1.NodeInternalIP { r := baseNode rs := baseService r.Service = &rs @@ -593,7 +599,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // For ClusterIP services, we register a service instance // for each endpoint. - case apiv1.ServiceTypeClusterIP: + case corev1.ServiceTypeClusterIP: t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, true) } } @@ -674,6 +680,16 @@ func (t *ServiceResource) registerServiceInstance( r.Service.Meta[ConsulK8SNodeName] = *subsetAddr.NodeName } + r.Check = &consulapi.AgentCheck{ + CheckID: consulHealthCheckID(endpoints.Namespace, serviceID(r.Service.Service, addr)), + Name: consulKubernetesCheckName, + Namespace: baseService.Namespace, + Type: consulKubernetesCheckType, + Status: consulapi.HealthPassing, + ServiceID: serviceID(r.Service.Service, addr), + Output: kubernetesSuccessReasonMsg, + } + t.consulMap[key] = append(t.consulMap[key], &r) } } @@ -723,7 +739,7 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { Watch(t.Ctx, options) }, }, - &apiv1.Endpoints{}, + &corev1.Endpoints{}, 0, cache.Indexers{}, ) @@ -731,7 +747,7 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { svc := t.Service - endpoints, ok := raw.(*apiv1.Endpoints) + endpoints, ok := raw.(*corev1.Endpoints) if !ok { svc.Log.Warn("upsert got invalid type", "raw", raw) return nil @@ -747,7 +763,7 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { // We are tracking this service so let's keep track of the endpoints if svc.endpointsMap == nil { - svc.endpointsMap = make(map[string]*apiv1.Endpoints) + svc.endpointsMap = make(map[string]*corev1.Endpoints) } svc.endpointsMap[key] = endpoints @@ -788,3 +804,8 @@ func (t *ServiceResource) addPrefixAndK8SNamespace(name, namespace string) strin return name } + +// consulHealthCheckID deterministically generates a health check ID based on service ID and Kubernetes namespace. +func consulHealthCheckID(k8sNS string, serviceID string) string { + return fmt.Sprintf("%s/%s", k8sNS, serviceID) +} diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 28335dea27..9ba94123ef 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -6,6 +6,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-k8s/control-plane/helper/controller" + consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" @@ -1005,6 +1006,43 @@ func TestServiceResource_clusterIP(t *testing.T) { }) } +// Test that the proper registrations with health checks are generated for a ClusterIP type. +func TestServiceResource_clusterIP_healthCheck(t *testing.T) { + t.Parallel() + client := fake.NewSimpleClientset() + syncer := newTestSyncer() + serviceResource := defaultServiceResource(client, syncer) + serviceResource.ClusterIPSync = true + + // Start the controller + closer := controller.TestControllerRun(&serviceResource) + defer closer() + + // Insert the service + svc := clusterIPService("foo", metav1.NamespaceDefault) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + // Insert the endpoints + createEndpoints(t, client, "foo", metav1.NamespaceDefault) + + // Verify what we got + retry.Run(t, func(r *retry.R) { + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(r, actual, 2) + require.Equal(r, consulKubernetesCheckName, actual[0].Check.Name) + require.Equal(r, consulapi.HealthPassing, actual[0].Check.Status) + require.Equal(r, kubernetesSuccessReasonMsg, actual[0].Check.Output) + require.Equal(r, consulKubernetesCheckType, actual[0].Check.Type) + require.Equal(r, consulKubernetesCheckName, actual[1].Check.Name) + require.Equal(r, consulapi.HealthPassing, actual[1].Check.Status) + require.Equal(r, kubernetesSuccessReasonMsg, actual[1].Check.Output) + require.Equal(r, consulKubernetesCheckType, actual[1].Check.Type) + }) +} + // Test clusterIP with prefix. func TestServiceResource_clusterIPPrefix(t *testing.T) { t.Parallel() diff --git a/control-plane/go.sum b/control-plane/go.sum index 098d32835d..7542ac12d9 100644 --- a/control-plane/go.sum +++ b/control-plane/go.sum @@ -346,12 +346,8 @@ github.com/hashicorp/consul-k8s/control-plane/cni v0.0.0-20220831174802-b8af6526 github.com/hashicorp/consul-server-connection-manager v0.1.0 h1:XCweGvMHzra88rYv2zxwwuUOjBUdcQmNKVrnQmt/muo= github.com/hashicorp/consul-server-connection-manager v0.1.0/go.mod h1:XVVlO+Yk7aiRpspiHZkrrFVn9BJIiOPnQIzqytPxGaU= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= -github.com/hashicorp/consul/api v1.10.1-0.20221220195433-629878a6879c h1:dIy67NF/J5gm0wdGxTA8dCFeQDz8TatuuMEzC7n3aDk= -github.com/hashicorp/consul/api v1.10.1-0.20221220195433-629878a6879c/go.mod h1:c1u8FzGHcavbEtRW/p1YditvfMgn4QsKNgz2rnCDF7c= github.com/hashicorp/consul/api v1.10.1-0.20230106171340-8d923c178919 h1:8aVegJMSv7PIAAa1zqQQ0CT4TKv+Nf7I4rhE6+uDa1U= github.com/hashicorp/consul/api v1.10.1-0.20230106171340-8d923c178919/go.mod h1:c1u8FzGHcavbEtRW/p1YditvfMgn4QsKNgz2rnCDF7c= -github.com/hashicorp/consul/api v1.18.0 h1:R7PPNzTCeN6VuQNDwwhZWJvzCtGSrNpJqfb22h3yH9g= -github.com/hashicorp/consul/api v1.18.0/go.mod h1:owRRGJ9M5xReDC5nfT8FTJrNAPbT4NM6p/k+d03q2v4= github.com/hashicorp/consul/proto-public v0.1.0 h1:O0LSmCqydZi363hsqc6n2v5sMz3usQMXZF6ziK3SzXU= github.com/hashicorp/consul/proto-public v0.1.0/go.mod h1:vs2KkuWwtjkIgA5ezp4YKPzQp4GitV+q/+PvksrA92k= github.com/hashicorp/consul/sdk v0.4.1-0.20221021205723-cc843c4be892 h1:jw0NwPmNPr5CxAU04hACdj61JSaJBKZ0FdBo+kwfNp4= @@ -384,7 +380,6 @@ github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5O github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc= -github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -454,7 +449,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -495,7 +489,6 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -595,7 +588,6 @@ github.com/rs/zerolog v1.4.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKk github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/conswriter v0.0.0-20180208195008-f5ae3917a627/go.mod h1:7zjs06qF79/FKAJpBvFx3P8Ww4UTIMAe+lpNXDHziac= github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6qbIiqJ6/Bqeq25bCLbL7YFmpaFfJDuM= @@ -804,7 +796,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=