From cc5850e4b2d9b1d132a442517b939b3266ee0c4d Mon Sep 17 00:00:00 2001 From: Nitya Dhanushkodi Date: Mon, 25 Mar 2024 13:34:09 -0700 Subject: [PATCH] [net-8411] bug: fix premature token and service instance deletion due to pod fetch errors (#3758) --- .changelog/3758.txt | 4 + .../endpoints/endpoints_controller.go | 56 ++-- .../endpoints/endpoints_controller_test.go | 246 +++++++++++++++++- 3 files changed, 280 insertions(+), 26 deletions(-) create mode 100644 .changelog/3758.txt diff --git a/.changelog/3758.txt b/.changelog/3758.txt new file mode 100644 index 0000000000..5c4c528f04 --- /dev/null +++ b/.changelog/3758.txt @@ -0,0 +1,4 @@ +```release-note:bug +control-plane: fix an issue where ACL tokens would prematurely be deleted and services would be deregistered if there +was a K8s API error fetching the pod. +``` diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index 0094f40c76..fe6c5aed0b 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -182,9 +182,9 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{RequeueAfter: requeueAfter}, err } - // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare + // deregisterEndpointAddress stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare // against service instances in Consul to deregister them if they are not in the map. - endpointAddressMap := map[string]bool{} + deregisterEndpointAddress := map[string]bool{} // Register all addresses of this Endpoints object as service instances in Consul. for _, subset := range serviceEndpoints.Subsets { @@ -193,16 +193,25 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu var pod corev1.Pod objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace} if err = r.Client.Get(ctx, objectKey, &pod); err != nil { - r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name) - errs = multierror.Append(errs, err) + // If the pod doesn't exist anymore, set up the deregisterEndpointAddress map to deregister it. + if k8serrors.IsNotFound(err) { + deregisterEndpointAddress[address.IP] = true + r.Log.Info("pod not found", "name", address.TargetRef.Name) + } else { + // If there was a different error fetching the pod, then log the error but don't deregister it + // since this could be a K8s API blip and we don't want to prematurely deregister. + deregisterEndpointAddress[address.IP] = false + r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name) + errs = multierror.Append(errs, err) + } continue } svcName, ok := pod.Annotations[constants.AnnotationKubernetesService] if ok && serviceEndpoints.Name != svcName { r.Log.Info("ignoring endpoint because it doesn't match explicit service annotation", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) - // deregistration for service instances that don't match the annotation happens - // later because we don't add this pod to the endpointAddressMap. + // Set up the deregisterEndpointAddress to deregister service instances that don't match the annotation. + deregisterEndpointAddress[address.IP] = true continue } @@ -219,8 +228,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true + // Build the deregisterEndpointAddress map up for deregistering service instances later. + deregisterEndpointAddress[pod.Status.PodIP] = false } else { r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState) @@ -247,17 +256,17 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true + // Build the deregisterEndpointAddress map up for deregistering service instances later. + deregisterEndpointAddress[pod.Status.PodIP] = false } } } } // Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister - // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during - // the registration codepath. - requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap) + // from Consul. This uses deregisterEndpointAddress which is populated with the addresses in the Endpoints object to + // either deregister or keep during the registration codepath. + requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, deregisterEndpointAddress) if err != nil { r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) @@ -929,8 +938,8 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string) // "k8s-service-name". So, we query Consul services by "k8s-service-name" metadata. // When querying by the k8s service name and namespace, the request will return service instances and // associated proxy service instances. -// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister -// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map +// The argument deregisterEndpointAddress decides whether to deregister *all* service instances or selectively deregister +// them only if they are not in deregisterEndpointAddress. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. // If the pod backing a Consul service instance still exists and the graceful shutdown lifecycle mode is enabled, the instance // will not be deregistered. Instead, its health check will be updated to Critical in order to drain incoming traffic and @@ -941,7 +950,7 @@ func (r *Controller) deregisterService( apiClient *api.Client, k8sSvcName string, k8sSvcNamespace string, - endpointsAddressesMap map[string]bool) (time.Duration, error) { + deregisterEndpointAddress map[string]bool) (time.Duration, error) { // Get services matching metadata from Consul serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace) @@ -958,7 +967,7 @@ func (r *Controller) deregisterService( // every service instance. var serviceDeregistered bool - if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) { + if deregister(svc.ServiceAddress, deregisterEndpointAddress) { // If graceful shutdown is enabled, continue to the next service instance and // mark that an event requeue is needed. We should requeue at the longest time interval // to prevent excessive re-queues. Also, updating the health status in Consul to Critical @@ -1621,10 +1630,15 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int { return -1 } -func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool { - if endpointsAddressesMap == nil { +// deregister returns that the address is marked for deregistration if the map is nil or if the address is explicitly +// marked in the map for deregistration. +func deregister(address string, deregisterEndpointAddress map[string]bool) bool { + if deregisterEndpointAddress == nil { return true } - _, ok := endpointsAddressesMap[address] - return !ok + deregister, ok := deregisterEndpointAddress[address] + if ok { + return deregister + } + return true } diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go index 7e4ea0d753..d22dfe0dac 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go @@ -5,6 +5,7 @@ package endpoints import ( "context" + "errors" "fmt" "strings" "testing" @@ -1766,10 +1767,10 @@ func TestReconcileCreateEndpoint(t *testing.T) { }, }, { - // This test has 3 addresses, but only 2 are backed by pod resources. This will cause Reconcile to error - // on the invalid address but continue and process the other addresses. We check for error specific to - // pod3 being non-existant at the end, and validate the other 2 addresses have service instances. - name: "Endpoints with multiple addresses but one is invalid", + // This test has 3 addresses, but only 2 are backed by pod resources. This will cause Reconcile to + // deregister the instance associated with the non-existent pod and continue and process the other + // addresses. We validate the other 2 addresses have service instances. + name: "Endpoints with multiple addresses but one is deleted", svcName: "service-created", consulSvcName: "service-created", k8sObjects: func() []runtime.Object { @@ -1896,7 +1897,6 @@ func TestReconcileCreateEndpoint(t *testing.T) { Type: constants.ConsulKubernetesCheckType, }, }, - expErr: "1 error occurred:\n\t* pods \"pod3\" not found\n\n", }, { name: "Every configurable field set: port, different Consul service name, meta, tags, upstreams, metrics", @@ -2264,6 +2264,242 @@ func TestParseLocality(t *testing.T) { }) } +func TestReconcile_PodErrorPreservesToken(t *testing.T) { + t.Parallel() + cases := []struct { + name string + svcName string + consulSvcName string + k8sObjects func() []runtime.Object + expectedConsulSvcInstances []*api.CatalogService + expectedProxySvcInstances []*api.CatalogService + expectedHealthChecks []*api.HealthCheck + metricsEnabled bool + telemetryCollectorDisabled bool + nodeMeta map[string]string + pod1Err string + }{ + { + name: "Error when fetching pod results in not deregistering the service instances in consul", + svcName: "service-created", + consulSvcName: "service-created", + nodeMeta: map[string]string{ + "test-node": "true", + }, + pod1Err: "some fake error while fetching pod", + k8sObjects: func() []runtime.Object { + pod1 := createServicePod("pod1", "1.2.3.4", true, true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-created", + ServiceName: "service-created", + ServiceAddress: "1.2.3.4", + ServicePort: 0, + ServiceMeta: map[string]string{constants.MetaKeyPodName: "pod1", metaKeyKubeServiceName: "service-created", constants.MetaKeyKubeNS: "default", metaKeyManagedBy: constants.ManagedByValue, metaKeySyntheticNode: "true", constants.MetaKeyPodUID: ""}, + ServiceTags: []string{}, + ServiceProxy: &api.AgentServiceConnectProxyConfig{}, + NodeMeta: map[string]string{ + "synthetic-node": "true", + "test-node": "true", + }, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-created-sidecar-proxy", + ServiceName: "service-created-sidecar-proxy", + ServiceAddress: "1.2.3.4", + ServicePort: 20000, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-created", + DestinationServiceID: "pod1-service-created", + LocalServiceAddress: "", + LocalServicePort: 0, + Config: map[string]any{"envoy_telemetry_collector_bind_socket_dir": string("/consul/connect-inject")}, + }, + ServiceMeta: map[string]string{constants.MetaKeyPodName: "pod1", metaKeyKubeServiceName: "service-created", constants.MetaKeyKubeNS: "default", metaKeyManagedBy: constants.ManagedByValue, metaKeySyntheticNode: "true", constants.MetaKeyPodUID: ""}, + ServiceTags: []string{}, + NodeMeta: map[string]string{ + "synthetic-node": "true", + "test-node": "true", + }, + }, + }, + expectedHealthChecks: []*api.HealthCheck{ + { + CheckID: "default/pod1-service-created", + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: constants.ConsulKubernetesCheckName, + Status: api.HealthPassing, + Output: constants.KubernetesSuccessReasonMsg, + Type: constants.ConsulKubernetesCheckType, + }, + { + CheckID: "default/pod1-service-created-sidecar-proxy", + ServiceName: "service-created-sidecar-proxy", + ServiceID: "pod1-service-created-sidecar-proxy", + Name: constants.ConsulKubernetesCheckName, + Status: api.HealthPassing, + Output: constants.KubernetesSuccessReasonMsg, + Type: constants.ConsulKubernetesCheckType, + }, + }, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + // Add the default namespace. + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + // Create fake k8s client + k8sObjects := append(tt.k8sObjects(), &ns, &node) + + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + customClient := fakeClientWithPodCustomization{fakeClient} + + // Create test consulServer server. + testClient := test.TestServerWithMockConnMgrWatcher(t, nil) + consulClient := testClient.APIClient + + // Create the endpoints controller. + ep := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulClientConfig: testClient.Cfg, + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consulServer", + ReleaseNamespace: "default", + NodeMeta: tt.nodeMeta, + } + if tt.metricsEnabled { + ep.MetricsConfig = metrics.Config{ + DefaultEnableMetrics: true, + EnableGatewayMetrics: true, + } + } + + ep.EnableTelemetryCollector = !tt.telemetryCollectorDisabled + + namespacedName := types.NamespacedName{ + Namespace: "default", + Name: tt.svcName, + } + + // Do a first reconcile to setup the state in Consul with the instances and tokens. + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // Do a second reconcile while stubbing the k8s client to return an error for the pod. Since it's not a "not + // found" error, we should expect that the service instance does not get deregistered and that the acl token + // is not deleted, so we will assert after this that the state in Consul still exists. + ep.Client = customClient + resp, err = ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.Contains(t, err.Error(), tt.pod1Err) + require.False(t, resp.Requeue) + + // These are the same assertions in the Reconcile-Create test cases, ensuring the state in Consul is correct. + // After reconciliation, Consul should have the service with the correct number of instances + serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) + require.NoError(t, err) + require.Len(t, serviceInstances, len(tt.expectedConsulSvcInstances)) + for i, instance := range serviceInstances { + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceName, instance.ServiceName) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTaggedAddresses, instance.ServiceTaggedAddresses) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceProxy, instance.ServiceProxy) + if tt.nodeMeta != nil { + require.Equal(t, tt.expectedConsulSvcInstances[i].NodeMeta, instance.NodeMeta) + } + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, len(tt.expectedProxySvcInstances)) + for i, instance := range proxyServiceInstances { + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceName, instance.ServiceName) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, tt.expectedProxySvcInstances[i].ServicePort, instance.ServicePort) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceTags, instance.ServiceTags) + if tt.nodeMeta != nil { + require.Equal(t, tt.expectedProxySvcInstances[i].NodeMeta, instance.NodeMeta) + } + // When comparing the ServiceProxy field we ignore the DestinationNamespace + // field within that struct because on Consul OSS it's set to "" but on Consul Enterprise + // it's set to "default" and we want to re-use this test for both OSS and Ent. + // This does mean that we don't test that field but that's okay because + // it's not getting set specifically in this test. + // To do the comparison that ignores that field we use go-cmp instead + // of the regular require.Equal call since it supports ignoring certain + // fields. + diff := cmp.Diff(tt.expectedProxySvcInstances[i].ServiceProxy, instance.ServiceProxy, + cmpopts.IgnoreFields(api.Upstream{}, "DestinationNamespace", "DestinationPartition")) + require.Empty(t, diff, "expected objects to be equal") + } + + // Check that the Consul health expectedCheck was created for the k8s pod. + for _, expectedCheck := range tt.expectedHealthChecks { + filter := fmt.Sprintf("ServiceID == %q", expectedCheck.ServiceID) + checks, _, err := consulClient.Health().Checks(expectedCheck.ServiceName, &api.QueryOptions{Filter: filter}) + require.NoError(t, err) + require.Equal(t, len(checks), 1) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace", "Partition", "CreateIndex", "ModifyIndex", "ServiceTags"} + require.True(t, cmp.Equal(checks[0], expectedCheck, cmpopts.IgnoreFields(api.HealthCheck{}, ignoredFields...))) + } + }) + } + +} + +type fakeClientWithPodCustomization struct { + client.WithWatch +} + +func (c fakeClientWithPodCustomization) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if key.Name == "pod1" { + return errors.New("some fake error while fetching pod") + } + err := c.WithWatch.Get(ctx, key, obj, opts...) + return err +} + // Tests updating an Endpoints object. // - Tests updates via the register codepath: // - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated.