diff --git a/.circleci/config.yml b/.circleci/config.yml index 66af22049c..5374eda76f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,8 +6,8 @@ executors: - image: docker.mirror.hashicorp.services/circleci/golang:1.14 environment: TEST_RESULTS: /tmp/test-results # path to where test results are saved - CONSUL_VERSION: 1.9.0-rc1 # Consul's OSS version to use in tests - CONSUL_ENT_VERSION: 1.9.0+ent-rc1 # Consul's enterprise version to use in tests + CONSUL_VERSION: 1.9.4 # Consul's OSS version to use in tests + CONSUL_ENT_VERSION: 1.9.4+ent # Consul's enterprise version to use in tests jobs: go-fmt-and-vet: diff --git a/connect-inject/cleanup_resource.go b/connect-inject/cleanup_resource.go deleted file mode 100644 index 9de5d2ac52..0000000000 --- a/connect-inject/cleanup_resource.go +++ /dev/null @@ -1,281 +0,0 @@ -package connectinject - -import ( - "fmt" - "sync" - "time" - - "github.com/hashicorp/consul-k8s/consul" - capi "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" - "golang.org/x/net/context" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -// CleanupResource implements Resource and is used to clean up Consul service -// instances that weren't deregistered when their pods were deleted. -// Usually the preStop hook in the pods handles this but during a force delete -// or OOM the preStop hook doesn't run. -type CleanupResource struct { - Log hclog.Logger - KubernetesClient kubernetes.Interface - // ConsulClient points at the agent on the same node as this pod. - ConsulClient *capi.Client - ReconcilePeriod time.Duration - Ctx context.Context - // ConsulScheme is the scheme to use when making API calls to Consul, - // i.e. "http" or "https". - ConsulScheme string - // ConsulPort is the port to make HTTP API calls to Consul agents on. - ConsulPort string - EnableConsulNamespaces bool - - lock sync.Mutex -} - -// Run starts the long-running Reconcile loop that runs on a timer. -func (c *CleanupResource) Run(stopCh <-chan struct{}) { - reconcileTimer := time.NewTimer(c.ReconcilePeriod) - defer reconcileTimer.Stop() - - for { - c.reconcile() - reconcileTimer.Reset(c.ReconcilePeriod) - - select { - case <-stopCh: - c.Log.Info("received stop signal, shutting down") - return - - case <-reconcileTimer.C: - // Fall through and continue the loop. - } - } -} - -// reconcile checks all registered Consul connect service instances and ensures -// the pod they represent is still running. If the pod is no longer running, -// it deregisters the service instance from its agent. -func (c *CleanupResource) reconcile() { - c.Log.Debug("starting reconcile") - - // currentPods is a map of currently existing pods. Keys are in the form - // "namespace/pod-name". Value doesn't matter since we're using this - // solely for quick "exists" checks. - // Use currentPodsKey() to construct the key when accessing this map. - currentPods := make(map[string]bool) - - // Gather needed data on nodes, services and pods. - kubeNodes, err := c.KubernetesClient.CoreV1().Nodes().List(c.Ctx, metav1.ListOptions{}) - if err != nil { - c.Log.Error("unable to get nodes", "error", err) - return - } - - // namespacesToServiceNames maps from Consul namespace to the list of service - // names registered in that namespace. - // If Consul namespaces are disabled, there will be only one key with value - // "", i.e. the empty string. - namespacesToServiceNames := make(map[string][]string) - if c.EnableConsulNamespaces { - namespaces, _, err := c.ConsulClient.Namespaces().List(nil) - if err != nil { - c.Log.Error("unable to get Consul namespaces", "error", err) - return - } - for _, ns := range namespaces { - namespacesToServiceNames[ns.Name] = nil - } - } else { - // This allows us to treat OSS the same as enterprise for the rest of - // the code path. - namespacesToServiceNames[""] = nil - } - - for ns := range namespacesToServiceNames { - serviceNames, _, err := c.ConsulClient.Catalog().Services(&capi.QueryOptions{Namespace: ns}) - if err != nil { - c.Log.Error("unable to get Consul services", "error", err) - return - } - namespacesToServiceNames[ns] = keys(serviceNames) - } - - podList, err := c.KubernetesClient.CoreV1().Pods(corev1.NamespaceAll).List(c.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - if err != nil { - c.Log.Error("unable to get pods", "error", err) - return - } - - // Build up our map of currently running pods. - for _, pod := range podList.Items { - currentPods[currentPodsKey(pod.Name, pod.Namespace)] = true - } - - // For each registered service, find the associated pod. - for ns, serviceNames := range namespacesToServiceNames { - for _, serviceName := range serviceNames { - serviceInstances, _, err := c.ConsulClient.Catalog().Service(serviceName, "", &capi.QueryOptions{Namespace: ns}) - if err != nil { - c.Log.Error("unable to get Consul service", "name", serviceName, "error", err) - return - } - for _, instance := range serviceInstances { - podName, hasPodMeta := instance.ServiceMeta[MetaKeyPodName] - k8sNamespace, hasNSMeta := instance.ServiceMeta[MetaKeyKubeNS] - if hasPodMeta && hasNSMeta { - - // Check if the instance matches a running pod. If not, deregister it. - if _, podExists := currentPods[currentPodsKey(podName, k8sNamespace)]; !podExists { - if !nodeInCluster(kubeNodes, instance.Node) { - c.Log.Debug("skipping deregistration because instance is from node not in this cluster", - "pod", podName, "id", instance.ServiceID, "ns", ns, "node", instance.Node) - continue - } - - c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID, "ns", ns) - err := c.deregisterInstance(instance, instance.Address) - if err != nil { - c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "ns", ns, "error", err) - continue - } - c.Log.Info("service instance deregistered", "id", instance.ServiceID, "ns", ns) - } - } - } - } - } - - c.Log.Debug("finished reconcile") - return -} - -// Delete is called when a pod is deleted. It checks that the Consul service -// instance for that pod is no longer registered with Consul. -// If the instance is still registered, it deregisters it. -func (c *CleanupResource) Delete(key string, obj interface{}) error { - pod, ok := obj.(*corev1.Pod) - if !ok { - return fmt.Errorf("expected pod, got: %#v", obj) - } - if pod == nil { - return fmt.Errorf("object for key %s was nil", key) - } - serviceName, ok := pod.ObjectMeta.Annotations[annotationService] - if !ok { - return fmt.Errorf("pod did not have %s annotation", annotationService) - } - kubeNS := pod.Namespace - podName := pod.Name - // NOTE: This will be an empty string with Consul OSS. - consulNS := pod.ObjectMeta.Annotations[annotationConsulNamespace] - - // Look for both the service and its sidecar proxy. - consulServiceNames := []string{serviceName, fmt.Sprintf("%s-sidecar-proxy", serviceName)} - - for _, consulServiceName := range consulServiceNames { - instances, _, err := c.ConsulClient.Catalog().Service(consulServiceName, "", &capi.QueryOptions{ - Filter: fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, MetaKeyPodName, podName, MetaKeyKubeNS, kubeNS), - Namespace: consulNS, - }) - if err != nil { - c.Log.Error("unable to get Consul Services", "error", err) - return err - } - if len(instances) == 0 { - c.Log.Debug("terminated pod had no still-registered instances", "service", consulServiceName, "pod", podName, "ns", consulNS) - continue - } - - // NOTE: We only expect a single instance because there's only one - // per pod but we may as well range over all of them just to be safe. - for _, instance := range instances { - // NOTE: We don't need to check if this instance belongs to a kube - // node in this cluster (like we do in Reconcile) because we only - // get the delete event if a pod in this cluster is deleted so - // we know this is one of our instances. - - c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID, "ns", consulNS) - err := c.deregisterInstance(instance, pod.Status.HostIP) - if err != nil { - c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "error", err) - return err - } - c.Log.Info("service instance deregistered", "id", instance.ServiceID, "ns", consulNS) - } - } - return nil -} - -// Upsert is a no-op because we're only interested in pods that are deleted. -func (c *CleanupResource) Upsert(_ string, _ interface{}) error { - return nil -} - -func (c *CleanupResource) Informer() cache.SharedIndexInformer { - return cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).List(c.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).Watch(c.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - }, - }, - &corev1.Pod{}, - // Resync is 0 because we perform our own reconcile loop on our own timer. - 0, - cache.Indexers{}, - ) -} - -// deregisterInstance deregisters instance from Consul by calling the agent at -// hostIP's deregister service API. -func (c *CleanupResource) deregisterInstance(instance *capi.CatalogService, hostIP string) error { - fullAddr := fmt.Sprintf("%s://%s:%s", c.ConsulScheme, hostIP, c.ConsulPort) - localConfig := capi.DefaultConfig() - if instance.Namespace != "" { - localConfig.Namespace = instance.Namespace - } - localConfig.Address = fullAddr - client, err := consul.NewClient(localConfig) - if err != nil { - return fmt.Errorf("constructing client for address %q: %s", hostIP, err) - } - - return client.Agent().ServiceDeregister(instance.ServiceID) -} - -// currentPodsKey should be used to construct the key to access the -// currentPods map. -func currentPodsKey(podName, k8sNamespace string) string { - return fmt.Sprintf("%s/%s", k8sNamespace, podName) -} - -// nodeInCluster returns whether nodeName is the name of a node in nodes, i.e. -// it's the name of a node in this cluster. -func nodeInCluster(nodes *corev1.NodeList, nodeName string) bool { - for _, n := range nodes.Items { - if n.Name == nodeName { - return true - } - } - return false -} - -// keys returns the keys of m. -func keys(m map[string][]string) []string { - var ks []string - for k := range m { - ks = append(ks, k) - } - return ks -} diff --git a/connect-inject/cleanup_resource_ent_test.go b/connect-inject/cleanup_resource_ent_test.go deleted file mode 100644 index ac33f7ec51..0000000000 --- a/connect-inject/cleanup_resource_ent_test.go +++ /dev/null @@ -1,324 +0,0 @@ -// +build enterprise - -package connectinject - -import ( - "net/url" - "testing" - - capi "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" -) - -func TestReconcile_ConsulNamespaces(t *testing.T) { - t.Parallel() - - cases := map[string]struct { - ConsulServices []capi.AgentServiceRegistration - KubePods []runtime.Object - // ExpConsulServiceIDs maps from Consul namespace to - // list of expected service ids in that namespace. - ExpConsulServiceIDs map[string][]string - }{ - "default namespace, pod deleted": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcDefaultNS, - }, - KubePods: nil, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - }, - }, - "default namespace, pod not deleted": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcDefaultNS, - }, - KubePods: []runtime.Object{consulFooPodDefaultNS}, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul", "foo-abc123-foo"}, - }, - }, - "foo namespace, pod deleted": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcFooNS, - }, - KubePods: nil, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - "foo": nil, - }, - }, - "foo namespace, pod not deleted": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcFooNS, - }, - KubePods: []runtime.Object{consulFooPodFooNS}, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - "foo": {"foo-abc123-foo"}, - }, - }, - "does not delete instances with same id in different namespaces": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcFooNS, - consulFooSvcBarNS, - }, - KubePods: []runtime.Object{consulFooPodFooNS}, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - "foo": {"foo-abc123-foo"}, - "bar": nil, - }, - }, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - require := require.New(t) - - // Start Consul server. - server, err := testutil.NewTestServerConfigT(t, nil) - defer server.Stop() - require.NoError(err) - server.WaitForSerfCheck(t) - consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) - require.NoError(err) - - // Register Consul services. - for _, svc := range c.ConsulServices { - _, _, err := consulClient.Namespaces().Create(&capi.Namespace{ - Name: svc.Namespace, - }, nil) - require.NoError(err) - require.NoError(consulClient.Agent().ServiceRegister(&svc)) - } - - // Create the cleanup resource. - log := hclog.Default().Named("cleanupResource") - log.SetLevel(hclog.Debug) - consulURL, err := url.Parse("http://" + server.HTTPAddr) - require.NoError(err) - node := nodeName(t, consulClient) - // NOTE: we need to add the node because the reconciler checks if - // the node the service is registered with actually exists in this - // cluster. - kubeResources := append(c.KubePods, &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: node, - }, - }) - cleanupResource := CleanupResource{ - Log: log, - KubernetesClient: fake.NewSimpleClientset(kubeResources...), - ConsulClient: consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - EnableConsulNamespaces: true, - } - - // Run Reconcile. - cleanupResource.reconcile() - - // Test that the remaining services are what we expect. - for ns, expSvcs := range c.ExpConsulServiceIDs { - // Note: we need to use the catalog endpoints because - // Agent().Services() does not currently support namespaces - // (https://github.com/hashicorp/consul/issues/9710). - services, _, err := consulClient.Catalog().Services(&capi.QueryOptions{Namespace: ns}) - require.NoError(err) - - var actualServiceIDs []string - for actSvcName := range services { - services, _, err := consulClient.Catalog().Service(actSvcName, "", &capi.QueryOptions{Namespace: ns}) - require.NoError(err) - for _, actSvc := range services { - actualServiceIDs = append(actualServiceIDs, actSvc.ServiceID) - } - } - require.ElementsMatch(actualServiceIDs, expSvcs, "ns=%s act=%v", ns, actualServiceIDs) - } - }) - } -} - -func TestDelete_ConsulNamespaces(t *testing.T) { - t.Parallel() - - cases := map[string]struct { - Pod *corev1.Pod - ConsulServices []capi.AgentServiceRegistration - // ExpConsulServiceIDs maps from Consul namespace to - // list of expected service ids in that namespace. - ExpConsulServiceIDs map[string][]string - ExpErr string - }{ - "default namespace": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcDefaultNS, - }, - Pod: consulFooPodDefaultNS, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - }, - }, - "foo namespace": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcFooNS, - }, - Pod: consulFooPodFooNS, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - "foo": nil, - }, - }, - "does not delete instances with same id in different namespaces": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvcFooNS, - consulFooSvcBarNS, - }, - Pod: consulFooPodFooNS, - ExpConsulServiceIDs: map[string][]string{ - "default": {"consul"}, - "foo": nil, - "bar": {"foo-abc123-foo"}, - }, - }, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - require := require.New(t) - - // Start Consul server. - server, err := testutil.NewTestServerConfigT(t, nil) - defer server.Stop() - require.NoError(err) - server.WaitForSerfCheck(t) - consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) - require.NoError(err) - - // Register Consul services. - for _, svc := range c.ConsulServices { - _, _, err := consulClient.Namespaces().Create(&capi.Namespace{ - Name: svc.Namespace, - }, nil) - require.NoError(err) - require.NoError(consulClient.Agent().ServiceRegister(&svc)) - } - - // Create the cleanup resource. - log := hclog.Default().Named("cleanupResource") - log.SetLevel(hclog.Debug) - consulURL, err := url.Parse("http://" + server.HTTPAddr) - require.NoError(err) - cleanupResource := CleanupResource{ - Log: log, - KubernetesClient: fake.NewSimpleClientset(), - ConsulClient: consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - EnableConsulNamespaces: true, - } - - // Run Delete. - err = cleanupResource.Delete("default/foo", c.Pod) - if c.ExpErr != "" { - require.EqualError(err, c.ExpErr) - } else { - require.NoError(err) - - // Test that the remaining services are what we expect. - for ns, expSvcs := range c.ExpConsulServiceIDs { - // Note: we need to use the catalog endpoints because - // Agent().Services() does not currently support namespaces - // (https://github.com/hashicorp/consul/issues/9710). - services, _, err := consulClient.Catalog().Services(&capi.QueryOptions{Namespace: ns}) - require.NoError(err) - - var actualServiceIDs []string - for actSvcName := range services { - services, _, err := consulClient.Catalog().Service(actSvcName, "", &capi.QueryOptions{Namespace: ns}) - require.NoError(err) - for _, actSvc := range services { - actualServiceIDs = append(actualServiceIDs, actSvc.ServiceID) - } - } - require.ElementsMatch(actualServiceIDs, expSvcs, "ns=%s act=%v", ns, actualServiceIDs) - } - } - }) - } -} - -var ( - consulFooSvcDefaultNS = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo", - Name: "foo", - Namespace: "default", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-abc123", - MetaKeyKubeNS: "default", - }, - } - consulFooSvcFooNS = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo", - Name: "foo", - Namespace: "foo", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-abc123", - MetaKeyKubeNS: "default", - }, - } - consulFooSvcBarNS = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo", - Name: "foo", - Namespace: "bar", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-abc123", - MetaKeyKubeNS: "bar", - }, - } - consulFooPodDefaultNS = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo-abc123", - Namespace: "default", - Labels: map[string]string{ - labelInject: injected, - }, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: "foo", - annotationConsulNamespace: "default", - }, - }, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - }, - } - consulFooPodFooNS = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo-abc123", - Namespace: "default", - Labels: map[string]string{ - labelInject: injected, - }, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: "foo", - annotationConsulNamespace: "foo", - }, - }, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - }, - } -) diff --git a/connect-inject/cleanup_resource_test.go b/connect-inject/cleanup_resource_test.go deleted file mode 100644 index 5f15d16f3c..0000000000 --- a/connect-inject/cleanup_resource_test.go +++ /dev/null @@ -1,338 +0,0 @@ -package connectinject - -import ( - "net/url" - "testing" - - capi "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" -) - -func TestReconcile(t *testing.T) { - t.Parallel() - - cases := map[string]struct { - ConsulServices []capi.AgentServiceRegistration - KubePods []runtime.Object - ExpConsulServiceIDs []string - // OutOfClusterNode controls whether the services are registered on a - // node that does not exist in this Kube cluster. - OutOfClusterNode bool - }{ - "no instances running": { - ConsulServices: nil, - KubePods: nil, - ExpConsulServiceIDs: nil, - }, - "instance does not have pod-name meta key": { - ConsulServices: []capi.AgentServiceRegistration{consulNoPodNameMetaSvc}, - ExpConsulServiceIDs: []string{"foo-abc123-foo"}, - }, - "instance does not have k8s-namespace meta key": { - ConsulServices: []capi.AgentServiceRegistration{consulNoK8sNSMetaSvc}, - ExpConsulServiceIDs: []string{"foo-abc123-foo"}, - }, - "out of cluster node": { - ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, - ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, - OutOfClusterNode: true, - }, - "app and sidecar still running": { - ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, - KubePods: []runtime.Object{fooPod}, - ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, - }, - "app and sidecar terminated": { - ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, - KubePods: nil, - ExpConsulServiceIDs: nil, - }, - "only app is registered, no sidecar": { - ConsulServices: []capi.AgentServiceRegistration{consulFooSvc}, - KubePods: nil, - ExpConsulServiceIDs: nil, - }, - "only sidecar is registered, no app": { - ConsulServices: []capi.AgentServiceRegistration{consulFooSvcSidecar}, - KubePods: nil, - ExpConsulServiceIDs: nil, - }, - "multiple instances of the same service": { - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvc, - consulFooSvcSidecar, - consulFooSvcPod2, - consulFooSvcSidecarPod2, - }, - KubePods: []runtime.Object{fooPod}, - ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, - }, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - require := require.New(t) - - // Start Consul server. - server, err := testutil.NewTestServerConfigT(t, nil) - defer server.Stop() - require.NoError(err) - server.WaitForLeader(t) - consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) - require.NoError(err) - - // Register Consul services. - for _, svc := range c.ConsulServices { - require.NoError(consulClient.Agent().ServiceRegister(&svc)) - } - - // Create the cleanup resource. - log := hclog.Default().Named("cleanupResource") - log.SetLevel(hclog.Debug) - consulURL, err := url.Parse("http://" + server.HTTPAddr) - require.NoError(err) - kubeResources := c.KubePods - if !c.OutOfClusterNode { - node := nodeName(t, consulClient) - // NOTE: we need to add the node because the reconciler checks if - // the node the service is registered with actually exists in this - // cluster. - kubeResources = append(kubeResources, &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: node, - }, - }) - - } - cleanupResource := CleanupResource{ - Log: log, - KubernetesClient: fake.NewSimpleClientset(kubeResources...), - ConsulClient: consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - } - - // Run Reconcile. - cleanupResource.reconcile() - - // Test that the remaining services are what we expect. - services, err := consulClient.Agent().Services() - require.NoError(err) - var actualServiceIDs []string - for id := range services { - actualServiceIDs = append(actualServiceIDs, id) - } - require.ElementsMatch(actualServiceIDs, c.ExpConsulServiceIDs) - }) - } -} - -func TestDelete(t *testing.T) { - t.Parallel() - - var nilPod *corev1.Pod - cases := map[string]struct { - Pod interface{} - ConsulServices []capi.AgentServiceRegistration - ExpConsulServiceIDs []string - ExpErr string - }{ - "pod is nil": { - Pod: nilPod, - ExpErr: "object for key default/foo was nil", - }, - "not expected type": { - Pod: &corev1.Service{}, - ExpErr: "expected pod, got: &v1.Service", - }, - "pod does not have service-name annotation": { - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo-abc123", - Namespace: "default", - }, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - }, - }, - ExpErr: "pod did not have consul.hashicorp.com/connect-service annotation", - }, - "instance does not have pod-name meta": { - Pod: fooPod, - ConsulServices: []capi.AgentServiceRegistration{consulNoPodNameMetaSvc}, - ExpConsulServiceIDs: []string{"foo-abc123-foo"}, - }, - "instance does not have k8s-namespace meta": { - Pod: fooPod, - ConsulServices: []capi.AgentServiceRegistration{consulNoK8sNSMetaSvc}, - ExpConsulServiceIDs: []string{"foo-abc123-foo"}, - }, - "no instances still registered": { - Pod: fooPod, - ConsulServices: nil, - ExpConsulServiceIDs: nil, - }, - "app and sidecar terminated": { - Pod: fooPod, - ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, - ExpConsulServiceIDs: nil, - }, - "only app is registered, no sidecar": { - Pod: fooPod, - ConsulServices: []capi.AgentServiceRegistration{consulFooSvc}, - ExpConsulServiceIDs: nil, - }, - "only sidecar is registered, no app": { - Pod: fooPod, - ConsulServices: []capi.AgentServiceRegistration{consulFooSvcSidecar}, - ExpConsulServiceIDs: nil, - }, - "multiple instances of the same service": { - Pod: fooPod, - ConsulServices: []capi.AgentServiceRegistration{ - consulFooSvc, - consulFooSvcSidecar, - consulFooSvcPod2, - consulFooSvcSidecarPod2, - }, - ExpConsulServiceIDs: []string{"foo-def456-foo", "foo-def456-foo-sidecar-proxy"}, - }, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - require := require.New(t) - - // Start Consul server. - server, err := testutil.NewTestServerConfigT(t, nil) - defer server.Stop() - require.NoError(err) - server.WaitForLeader(t) - consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) - require.NoError(err) - - // Register Consul services. - for _, svc := range c.ConsulServices { - require.NoError(consulClient.Agent().ServiceRegister(&svc)) - } - - // Create the cleanup resource. - log := hclog.Default().Named("cleanupResource") - log.SetLevel(hclog.Debug) - consulURL, err := url.Parse("http://" + server.HTTPAddr) - require.NoError(err) - cleanupResource := CleanupResource{ - Log: log, - KubernetesClient: fake.NewSimpleClientset(), - ConsulClient: consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - } - - // Run Delete. - err = cleanupResource.Delete("default/foo", c.Pod) - if c.ExpErr != "" { - require.Error(err) - require.Contains(err.Error(), c.ExpErr) - } else { - require.NoError(err) - - // Test that the remaining services are what we expect. - services, err := consulClient.Agent().Services() - require.NoError(err) - var actualServiceIDs []string - for id := range services { - actualServiceIDs = append(actualServiceIDs, id) - } - require.ElementsMatch(actualServiceIDs, c.ExpConsulServiceIDs) - } - }) - } -} - -// nodeName returns the Consul node name for the agent that client -// points at. -func nodeName(t *testing.T, client *capi.Client) string { - self, err := client.Agent().Self() - require.NoError(t, err) - require.Contains(t, self, "Config") - require.Contains(t, self["Config"], "NodeName") - return self["Config"]["NodeName"].(string) -} - -var ( - consulFooSvc = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo", - Name: "foo", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-abc123", - MetaKeyKubeNS: "default", - }, - } - consulFooSvcSidecar = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo-sidecar-proxy", - Name: "foo-sidecar-proxy", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-abc123", - MetaKeyKubeNS: "default", - }, - } - consulFooSvcPod2 = capi.AgentServiceRegistration{ - ID: "foo-def456-foo", - Name: "foo", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-def456", - MetaKeyKubeNS: "default", - }, - } - consulFooSvcSidecarPod2 = capi.AgentServiceRegistration{ - ID: "foo-def456-foo-sidecar-proxy", - Name: "foo-sidecar-proxy", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-def456", - MetaKeyKubeNS: "default", - }, - } - consulNoPodNameMetaSvc = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo", - Name: "foo", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyKubeNS: "default", - }, - } - consulNoK8sNSMetaSvc = capi.AgentServiceRegistration{ - ID: "foo-abc123-foo", - Name: "foo", - Address: "127.0.0.1", - Meta: map[string]string{ - MetaKeyPodName: "foo-abc123", - }, - } - fooPod = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo-abc123", - Namespace: "default", - Labels: map[string]string{ - labelInject: injected, - }, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: "foo", - }, - }, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - }, - } -) diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 5615696d6f..9f799ee051 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -5,7 +5,7 @@ import ( "fmt" "strings" - mapset "github.com/deckarep/golang-set" + "github.com/deckarep/golang-set" "github.com/go-logr/logr" "github.com/hashicorp/consul-k8s/consul" "github.com/hashicorp/consul/api" @@ -25,10 +25,16 @@ import ( ) const ( - MetaKeyPodName = "pod-name" - MetaKeyKubeServiceName = "k8s-service-name" - MetaKeyKubeNS = "k8s-namespace" - envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" + MetaKeyPodName = "pod-name" + MetaKeyKubeServiceName = "k8s-service-name" + MetaKeyKubeNS = "k8s-namespace" + envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" + kubernetesSuccessReasonMsg = "Kubernetes health checks passing" + podPendingReasonMsg = "Pod is pending" + + // labelInject is the label which is applied by the connect-inject webhook to all pods. + // This is the key controllers will use on the label filter for its lister, watcher and reconciler. + labelInject = "consul.hashicorp.com/connect-inject-status" ) type EndpointsController struct { @@ -135,6 +141,18 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( r.Log.Error(err, "failed to register proxy service with Consul", "consul-proxy-service-name", proxyServiceRegistration.Name) return ctrl.Result{}, err } + + // Update the TTL health check for the service. + // This is required because ServiceRegister() does not update the TTL if the service already exists. + r.Log.Info("updating ttl health check", "service", proxyServiceRegistration.Name) + status, reason, err := getReadyStatusAndReason(&pod) + if err != nil { + return ctrl.Result{}, err + } + err = client.Agent().UpdateTTL(getConsulHealthCheckID(&pod, serviceRegistration.ID), reason, status) + if err != nil { + return ctrl.Result{}, err + } } } } @@ -212,6 +230,11 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service tags = append(tags, strings.Split(raw, ",")...) } + status, _, err := getReadyStatusAndReason(&pod) + if err != nil { + return nil, nil, err + } + service := &api.AgentServiceRegistration{ ID: serviceID, Name: serviceName, @@ -219,6 +242,14 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Address: pod.Status.PodIP, Meta: meta, Namespace: "", // TODO: namespace support + Check: &api.AgentServiceCheck{ + CheckID: getConsulHealthCheckID(&pod, serviceID), + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: status, + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, } if len(tags) > 0 { service.Tags = tags @@ -292,6 +323,39 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service return service, proxyService, nil } +// getConsulHealthCheckID deterministically generates a health check ID that will be unique to the Agent +// where the health check is registered and deregistered. +func getConsulHealthCheckID(pod *corev1.Pod, serviceID string) string { + return fmt.Sprintf("%s/%s/kubernetes-health-check", pod.Namespace, serviceID) +} + +// getReadyStatusAndReason returns the formatted status string to pass to Consul based on the +// ready state of the pod along with the reason message which will be passed into the Notes +// field of the Consul health check. +func getReadyStatusAndReason(pod *corev1.Pod) (string, string, error) { + // A pod might be pending if the init containers have run but the non-init + // containers haven't reached running state. In this case we set a failing health + // check so the pod doesn't receive traffic before it's ready. + if pod.Status.Phase == corev1.PodPending { + return api.HealthCritical, podPendingReasonMsg, nil + } + + for _, cond := range pod.Status.Conditions { + var consulStatus, reason string + if cond.Type == corev1.PodReady { + if cond.Status != corev1.ConditionTrue { + consulStatus = api.HealthCritical + reason = cond.Message + } else { + consulStatus = api.HealthPassing + reason = kubernetesSuccessReasonMsg + } + return consulStatus, reason, nil + } + } + return "", "", fmt.Errorf("no ready status for pod: %s", pod.Name) +} + // deregisterServiceOnAllAgents queries all agents for service instances that have the metadata // "k8s-service-name"=k8sSvcName and "k8s-namespace"=k8sSvcNamespace. The k8s service name may or may not match the // consul service name, but the k8s service name will always match the metadata on the Consul service diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 1ab3d5fa2f..b0cd999132 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -8,6 +8,8 @@ import ( "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/consul-k8s/subcommand/common" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" @@ -363,7 +365,7 @@ func TestProcessUpstreams(t *testing.T) { require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) httpAddr := consul.HTTPAddr if tt.consulUnavailable { httpAddr = "hostname.does.not.exist:8500" @@ -414,6 +416,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { expectedNumSvcInstances int expectedConsulSvcInstances []*api.CatalogService expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthCheck *api.AgentCheck }{ { name: "Empty endpoints", @@ -436,6 +439,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { expectedNumSvcInstances: 0, expectedConsulSvcInstances: []*api.CatalogService{}, expectedProxySvcInstances: []*api.CatalogService{}, + expectedAgentHealthCheck: nil, }, { name: "Basic endpoints", @@ -493,6 +497,15 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceTags: []string{}, }, }, + expectedAgentHealthCheck: &api.AgentCheck{ + CheckID: "default/pod1-service-created/kubernetes-health-check", + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, }, { name: "Endpoints with multiple addresses", @@ -582,6 +595,15 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceTags: []string{}, }, }, + expectedAgentHealthCheck: &api.AgentCheck{ + CheckID: "default/pod1-service-created/kubernetes-health-check", + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, }, { name: "Every configurable field set: port, different Consul service name, meta, tags, upstreams, metrics", @@ -670,6 +692,15 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceTags: []string{"abc", "123", "def", "456"}, }, }, + expectedAgentHealthCheck: &api.AgentCheck{ + CheckID: "default/pod1-different-consul-svc-name/kubernetes-health-check", + ServiceName: "different-consul-svc-name", + ServiceID: "pod1-different-consul-svc-name", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, }, } for _, tt := range cases { @@ -691,7 +722,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { }) require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) cfg := &api.Config{ Address: consul.HTTPAddr, @@ -765,6 +796,17 @@ func TestReconcileCreateEndpoint(t *testing.T) { require.Contains(t, expectedChecks, checks[0].Name) require.Contains(t, expectedChecks, checks[1].Name) } + + // Check that the Consul health check was created for the k8s pod. + if tt.expectedAgentHealthCheck != nil { + filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthCheck.CheckID) + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, len(check), 1) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[tt.expectedAgentHealthCheck.CheckID], tt.expectedAgentHealthCheck, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } }) } } @@ -773,6 +815,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { // - Tests updates via the register codepath: // - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated. // - When an address is added to an Endpoint, an additional service instance in Consul is registered. +// - When an address in an Endpoint is updated - via health check change - the corresponding service instance is updated. // - Tests updates via the deregister codepath: // - When an address is removed from an Endpoint, the corresponding service instance in Consul is deregistered. // - When an address is removed from an Endpoint *and there are no addresses left in the Endpoint*, the @@ -792,7 +835,168 @@ func TestReconcileUpdateEndpoint(t *testing.T) { expectedNumSvcInstances int expectedConsulSvcInstances []*api.CatalogService expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthCheck *api.AgentCheck }{ + { + name: "Endpoints has an updated address because health check changes from unhealthy to healthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true) + pod1.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }} + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Check: &api.AgentServiceCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: "passing", + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthCheck: &api.AgentCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + }, + }, + { + name: "Endpoints has an updated address because health check changes from healthy to unhealthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Check: &api.AgentServiceCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: "passing", + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthCheck: &api.AgentCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + }, { name: "Endpoints has an updated address (pod IP change).", consulSvcName: "service-updated", @@ -1277,6 +1481,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { expectedProxySvcInstances: []*api.CatalogService{}, }, } + // Each test is run with ACLs+TLS (secure) enabled and disabled. for _, secure := range []bool{true, false} { for _, tt := range cases { t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) { @@ -1308,12 +1513,15 @@ func TestReconcileUpdateEndpoint(t *testing.T) { require.NoError(t, err) defer consul.Stop() consul.WaitForSerfCheck(t) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] cfg := &api.Config{ Scheme: "http", Address: consul.HTTPAddr, } if secure { + consulPort = strings.Split(consul.HTTPSAddr, ":")[1] cfg.Address = consul.HTTPSAddr cfg.Scheme = "https" cfg.TLSConfig = api.TLSConfig{ @@ -1323,8 +1531,6 @@ func TestReconcileUpdateEndpoint(t *testing.T) { } consulClient, err := api.NewClient(cfg) require.NoError(t, err) - addr := strings.Split(cfg.Address, ":") - consulPort := addr[1] // Register service and proxy in consul for _, svc := range tt.initialConsulSvcs { @@ -1371,6 +1577,16 @@ func TestReconcileUpdateEndpoint(t *testing.T) { require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) } + // Check that the Consul health check was created for the k8s pod. + if tt.expectedAgentHealthCheck != nil { + filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthCheck.CheckID) + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, len(check), 1) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[tt.expectedAgentHealthCheck.CheckID], tt.expectedAgentHealthCheck, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } }) } } @@ -1456,7 +1672,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) cfg := &api.Config{ Address: consul.HTTPAddr, } @@ -2196,7 +2412,7 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) { require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) consulClient, err := api.NewClient(&api.Config{ Address: consul.HTTPAddr, }) @@ -2219,6 +2435,11 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) { } } +const ( + testFailureMessage = "Kubernetes pod readiness probe failed" + ttl = "ttl" +) + func createPod(name, ip string, inject bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -2230,6 +2451,12 @@ func createPod(name, ip string, inject bool) *corev1.Pod { Status: corev1.PodStatus{ PodIP: ip, HostIP: "127.0.0.1", + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + Message: testFailureMessage, + }}, }, } if inject { @@ -2237,7 +2464,6 @@ func createPod(name, ip string, inject bool) *corev1.Pod { pod.Annotations[annotationStatus] = injected } return pod - } func toStringPtr(input string) *string { diff --git a/connect-inject/health_check_resource.go b/connect-inject/health_check_resource.go deleted file mode 100644 index b908064c2f..0000000000 --- a/connect-inject/health_check_resource.go +++ /dev/null @@ -1,340 +0,0 @@ -package connectinject - -import ( - "errors" - "fmt" - "strings" - "sync" - "time" - - "github.com/hashicorp/consul-k8s/consul" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" - "golang.org/x/net/context" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -const ( - // labelInject is the label which is applied by the connect-inject webhook to all pods. - // This is the key the controller will use on the label filter for its lister, watcher and reconciler. - labelInject = "consul.hashicorp.com/connect-inject-status" - - // kubernetesSuccessReasonMsg will be passed for passing health check's Reason to Consul. - kubernetesSuccessReasonMsg = "Kubernetes health checks passing" - - podPendingReasonMsg = "Pod is pending" -) - -// ServiceNotFoundErr is returned when a Consul service instance is not registered. -var ServiceNotFoundErr = errors.New("service is not registered in Consul") - -type HealthCheckResource struct { - Log hclog.Logger - KubernetesClientset kubernetes.Interface - - // ConsulScheme is the scheme to use when making API calls to Consul, - // i.e. "http" or "https". - ConsulScheme string - // ConsulPort is the port to make HTTP API calls to Consul agents on. - ConsulPort string - // ReconcilePeriod is the period by which reconcile gets called. - // default to 1 minute. - ReconcilePeriod time.Duration - - Ctx context.Context - lock sync.Mutex -} - -// Run is the long-running runloop for periodically running Reconcile. -// It initially reconciles at startup and is then invoked after every -// ReconcilePeriod expires. -func (h *HealthCheckResource) Run(stopCh <-chan struct{}) { - reconcileTimer := time.NewTimer(h.ReconcilePeriod) - defer reconcileTimer.Stop() - - for { - h.reconcile() - reconcileTimer.Reset(h.ReconcilePeriod) - - select { - case <-stopCh: - h.Log.Info("received stop signal, shutting down") - return - - case <-reconcileTimer.C: - // Fall through and continue the loop. - } - } -} - -// Delete is not implemented because it is handled by the preStop phase whereby all services -// related to the pod are deregistered which also deregisters health checks. -func (h *HealthCheckResource) Delete(string, interface{}) error { - return nil -} - -// Informer starts a sharedindex informer which watches and lists corev1.Pod objects -// which meet the filter of labelInject. -func (h *HealthCheckResource) Informer() cache.SharedIndexInformer { - return cache.NewSharedIndexInformer( - // ListWatch takes a List and Watch function which we filter based on label which was injected. - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return h.KubernetesClientset.CoreV1().Pods(metav1.NamespaceAll).List(h.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return h.KubernetesClientset.CoreV1().Pods(metav1.NamespaceAll).Watch(h.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - }, - }, - &corev1.Pod{}, // the target type (Pod) - 0, // no resync (period of 0) - cache.Indexers{}, - ) -} - -// Upsert processes a create or update event. -// Two primary use cases are handled, new pods will get a new consul TTL health check -// registered against their respective agent and service, and updates to pods will have -// this TTL health check updated to reflect the pod's readiness status. -func (h *HealthCheckResource) Upsert(_ string, raw interface{}) error { - pod, ok := raw.(*corev1.Pod) - if !ok { - return fmt.Errorf("failed to cast to a pod object") - } - err := h.reconcilePod(pod) - if err != nil { - h.Log.Error("unable to update pod", "err", err) - return err - } - return nil -} - -// reconcile iterates through all Pods with the appropriate label and compares the -// current health check status against that which is stored in Consul and updates -// the consul health check accordingly. If the health check doesn't yet exist it will create it. -func (h *HealthCheckResource) reconcile() { - h.lock.Lock() - defer h.lock.Unlock() - h.Log.Debug("starting reconcile") - // First grab the list of Pods which have the label labelInject. - podList, err := h.KubernetesClientset.CoreV1().Pods(corev1.NamespaceAll).List(h.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - if err != nil { - h.Log.Error("unable to get pods", "err", err) - return - } - // Reconcile the state of each pod in the podList. - for _, pod := range podList.Items { - err = h.reconcilePod(&pod) - if err != nil { - h.Log.Error("unable to update pod", "err", err) - } - } - h.Log.Debug("finished reconcile") -} - -// reconcilePod will reconcile a pod. This is the common work for both Upsert and Reconcile. -func (h *HealthCheckResource) reconcilePod(pod *corev1.Pod) error { - h.Log.Debug("processing pod", "name", pod.Name) - if !h.shouldProcess(pod) { - // Skip pods that are not running or have not been properly injected. - return nil - } - // Fetch the identifiers we will use to interact with the Consul agent for this pod. - serviceID := h.getConsulServiceID(pod) - healthCheckID := h.getConsulHealthCheckID(pod) - status, reason, err := h.getReadyStatusAndReason(pod) - if err != nil { - return fmt.Errorf("unable to get pod status: %s", err) - } - // Get a client connection to the correct agent. - client, err := h.getConsulClient(pod) - if err != nil { - return fmt.Errorf("unable to get Consul client connection for %s: %s", pod.Name, err) - } - // Retrieve the health check that would exist if the service had one registered for this pod. - serviceCheck, err := h.getServiceCheck(client, healthCheckID) - if err != nil { - return fmt.Errorf("unable to get agent health checks: serviceID=%s, checkID=%s, %s", serviceID, healthCheckID, err) - } - if serviceCheck == nil { - // Create a new health check. - h.Log.Debug("registering new health check", "name", pod.Name, "id", healthCheckID) - err = h.registerConsulHealthCheck(client, healthCheckID, serviceID, status) - if errors.Is(err, ServiceNotFoundErr) { - h.Log.Warn("skipping registration because service not registered with Consul - this may be because the pod is shutting down", "serviceID", serviceID) - return nil - } else if err != nil { - return fmt.Errorf("unable to register health check: %s", err) - } - h.Log.Debug("updating health check status", "name", pod.Name, "status", status, "reason", reason) - // Also update it, the reason this is separate is there is no way to set the Output field of the health check - // at creation time, and this is what is displayed on the UI as opposed to the Notes field. - err = h.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) - if err != nil { - return fmt.Errorf("error updating health check: %s", err) - } - } else if serviceCheck.Status != status { - // Update the healthCheck. - h.Log.Debug("updating health check status", "name", pod.Name, "status", status, "reason", reason) - err = h.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) - if err != nil { - return fmt.Errorf("error updating health check: %s", err) - } - } - return nil -} - -// updateConsulHealthCheckStatus updates the consul health check status. -func (h *HealthCheckResource) updateConsulHealthCheckStatus(client *api.Client, consulHealthCheckID, status, reason string) error { - h.Log.Debug("updating health check", "id", consulHealthCheckID) - return client.Agent().UpdateTTL(consulHealthCheckID, reason, status) -} - -// registerConsulHealthCheck registers a TTL health check for the service on this Agent. -// The Agent is local to the Pod which has a kubernetes health check. -// This has the effect of marking the service instance healthy/unhealthy for Consul service mesh traffic. -func (h *HealthCheckResource) registerConsulHealthCheck(client *api.Client, consulHealthCheckID, serviceID, status string) error { - h.Log.Debug("registering Consul health check", "id", consulHealthCheckID, "serviceID", serviceID) - - // Create a TTL health check in Consul associated with this service and pod. - // The TTL time is 100000h which should ensure that the check never fails due to timeout - // of the TTL check. - err := client.Agent().CheckRegister(&api.AgentCheckRegistration{ - ID: consulHealthCheckID, - Name: "Kubernetes Health Check", - ServiceID: serviceID, - AgentServiceCheck: api.AgentServiceCheck{ - TTL: "100000h", - Status: status, - SuccessBeforePassing: 1, - FailuresBeforeCritical: 1, - }, - }) - if err != nil { - // Full error looks like: - // Unexpected response code: 500 (ServiceID "consulnamespace/svc-id" does not exist) - if strings.Contains(err.Error(), fmt.Sprintf("%s\" does not exist", serviceID)) { - return ServiceNotFoundErr - } - return fmt.Errorf("registering health check for service %q: %s", serviceID, err) - } - return nil -} - -// getServiceCheck will return the health check for this pod and service if it exists. -func (h *HealthCheckResource) getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) { - filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) - checks, err := client.Agent().ChecksWithFilter(filter) - if err != nil { - return nil, fmt.Errorf("getting check %q: %s", healthCheckID, err) - } - // This will be nil (does not exist) or an actual check. - return checks[healthCheckID], nil -} - -// getReadyStatusAndReason returns the formatted status string to pass to Consul based on the -// ready state of the pod along with the reason message which will be passed into the Notes -// field of the Consul health check. -func (h *HealthCheckResource) getReadyStatusAndReason(pod *corev1.Pod) (string, string, error) { - // A pod might be pending if the init containers have run but the non-init - // containers haven't reached running state. In this case we set a failing health - // check so the pod doesn't receive traffic before it's ready. - if pod.Status.Phase == corev1.PodPending { - return api.HealthCritical, podPendingReasonMsg, nil - } - - for _, cond := range pod.Status.Conditions { - var consulStatus, reason string - if cond.Type == corev1.PodReady { - if cond.Status != corev1.ConditionTrue { - consulStatus = api.HealthCritical - reason = cond.Message - } else { - consulStatus = api.HealthPassing - reason = kubernetesSuccessReasonMsg - } - return consulStatus, reason, nil - } - } - return "", "", fmt.Errorf("no ready status for pod: %s", pod.Name) -} - -// getConsulClient returns an *api.Client that points at the consul agent local to the pod. -func (h *HealthCheckResource) getConsulClient(pod *corev1.Pod) (*api.Client, error) { - newAddr := fmt.Sprintf("%s://%s:%s", h.ConsulScheme, pod.Status.HostIP, h.ConsulPort) - localConfig := api.DefaultConfig() - localConfig.Address = newAddr - if pod.Annotations[annotationConsulNamespace] != "" { - localConfig.Namespace = pod.Annotations[annotationConsulNamespace] - } - localClient, err := consul.NewClient(localConfig) - if err != nil { - h.Log.Error("unable to get Consul API Client", "addr", newAddr, "err", err) - return nil, err - } - h.Log.Debug("setting consul client to the following agent", "addr", newAddr) - return localClient, err -} - -// shouldProcess is a simple filter which determines if Upsert or Reconcile should attempt to process the pod. -// This is done without making any client api calls so it is fast. -func (h *HealthCheckResource) shouldProcess(pod *corev1.Pod) bool { - if pod.Annotations[annotationStatus] != injected { - return false - } - - // If the pod has been terminated, we don't want to try and modify its - // health check status because the preStop hook will have deregistered - // this pod and so we'll get errors making API calls to set the status - // of a check for a service that doesn't exist. - // We detect a terminated pod by looking to see if all the containers - // have their state set as "terminated". Kubernetes will only send - // an update to this reconciler when all containers have stopped so if - // the conditions below are satisfied we're guaranteed that the preStop - // hook has run. - if pod.Status.Phase == corev1.PodRunning && len(pod.Status.ContainerStatuses) > 0 { - allTerminated := true - for _, c := range pod.Status.ContainerStatuses { - if c.State.Terminated == nil { - allTerminated = false - break - } - } - if allTerminated { - return false - } - // Otherwise we fall through to checking if the service has been - // registered yet. - } - - // We process any pod that has had its injection init container completed because - // this means the service instance has been registered with Consul and so we can - // and should set its health check status. If we don't set the health check - // immediately after registration, the pod will start to receive traffic, - // even if its non-init containers haven't yet reached the running state. - for _, c := range pod.Status.InitContainerStatuses { - if c.Name == InjectInitContainerName { - return c.State.Terminated != nil && c.State.Terminated.Reason == "Completed" - } - } - return false -} - -// getConsulHealthCheckID deterministically generates a health check ID that will be unique to the Agent -// where the health check is registered and deregistered. -func (h *HealthCheckResource) getConsulHealthCheckID(pod *corev1.Pod) string { - return fmt.Sprintf("%s/%s/kubernetes-health-check", pod.Namespace, h.getConsulServiceID(pod)) -} - -// getConsulServiceID returns the serviceID of the connect service. -func (h *HealthCheckResource) getConsulServiceID(pod *corev1.Pod) string { - return fmt.Sprintf("%s-%s", pod.Name, pod.Annotations[annotationService]) -} diff --git a/connect-inject/health_check_resource_ent_test.go b/connect-inject/health_check_resource_ent_test.go deleted file mode 100644 index 65a5e5e645..0000000000 --- a/connect-inject/health_check_resource_ent_test.go +++ /dev/null @@ -1,288 +0,0 @@ -// +build enterprise - -package connectinject - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hashicorp/consul/api" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - testNamespace = "testnamespace" - testNamespacedHealthCheckID = "testnamespace/test-pod-test-service/kubernetes-health-check" - - testAlternateNamespace = "testalternatenamespace" - testAlternateNamespacedHealthCheckID = "testalternatenamespace/test-pod-test-service/kubernetes-health-check" -) - -var ignoredFieldsEnterprise = []string{"Node", "Definition", "ServiceID", "ServiceName"} - -var testPodWithNamespace = corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, - Name: testPodName, - }, - Spec: corev1.PodSpec{}, -} - -// Test that when consulNamespaces are enabled, the health check is registered in the right namespace. -func TestReconcilePodWithNamespace(t *testing.T) { - t.Parallel() - cases := []struct { - Name string - PreCreateHealthCheck bool - InitialState string - Pod *corev1.Pod - Expected *api.AgentCheck - }{ - { - Name: "reconcilePod will create check and set passed", - PreCreateHealthCheck: false, - InitialState: "", // only used when precreating a health check - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthPassing, - Notes: "", - Output: kubernetesSuccessReasonMsg, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "reconcilePod will create check and set failed", - PreCreateHealthCheck: false, - InitialState: "", // only used when precreating a health check - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthCritical, - Notes: "", - Output: testFailureMessage, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate a passing pod and change to failed", - PreCreateHealthCheck: true, - InitialState: api.HealthPassing, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthCritical, - Output: testFailureMessage, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate failed pod and change to passing", - PreCreateHealthCheck: true, - InitialState: api.HealthCritical, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthPassing, - Output: testCheckNotesPassing, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate failed check, no pod changes results in no health check changes", - PreCreateHealthCheck: true, - InitialState: api.HealthCritical, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthCritical, - Output: "", // when there is no change in status, Consul doesnt set the Output field - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate failed pod and change to passing, k8s/consul namespaces different", - PreCreateHealthCheck: true, - InitialState: api.HealthCritical, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testAlternateNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testAlternateNamespacedHealthCheckID, - Status: api.HealthPassing, - Output: testCheckNotesPassing, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - } - for _, tt := range cases { - t.Run(tt.Name, func(t *testing.T) { - require := require.New(t) - // Get a server, client, and handler. - server, client, resource := testServerAgentResourceAndControllerWithConsulNS(t, tt.Pod, testNamespace) - defer server.Stop() - // Create the namespace in Consul. - _, _, err := client.Namespaces().Create(&api.Namespace{Name: testNamespace}, nil) - require.NoError(err) - - // Register the service with Consul. - err = client.Agent().ServiceRegister(&api.AgentServiceRegistration{ - ID: testServiceNameReg, - Name: testServiceNameAnnotation, - Namespace: testNamespace, - }) - require.NoError(err) - if tt.PreCreateHealthCheck { - // Register the health check if this is not an object create path. - registerHealthCheck(t, client, tt.InitialState) - } - // Upsert and Reconcile both use reconcilePod to reconcile a pod. - err = resource.reconcilePod(tt.Pod) - require.NoError(err) - // Get the agent checks if they were registered. - actual := getConsulAgentChecks(t, client, tt.Expected.CheckID) - require.True(cmp.Equal(actual, tt.Expected, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFieldsEnterprise...))) - }) - } -} diff --git a/connect-inject/health_check_resource_test.go b/connect-inject/health_check_resource_test.go deleted file mode 100644 index 97cdd41aba..0000000000 --- a/connect-inject/health_check_resource_test.go +++ /dev/null @@ -1,813 +0,0 @@ -package connectinject - -import ( - "context" - "fmt" - "net/url" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" -) - -const ( - testPodName = "test-pod" - testServiceNameAnnotation = "test-service" - testServiceNameReg = "test-pod-test-service" - testHealthCheckID = "default/test-pod-test-service/kubernetes-health-check" - testFailureMessage = "Kubernetes pod readiness probe failed" - testCheckNotesPassing = "Kubernetes health checks passing" - ttl = "ttl" - name = "Kubernetes Health Check" -) - -// Used by gocmp. -var ignoredFields = []string{"Node", "Namespace", "Definition", "ServiceID", "ServiceName"} - -var testPodSpec = corev1.PodSpec{ - Containers: []corev1.Container{ - corev1.Container{ - Name: testPodName, - }, - }, -} - -var completedInjectInitContainer = []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - Reason: "Completed", - }, - }, - Ready: true, - }, -} - -func registerHealthCheck(t *testing.T, client *api.Client, initialState string) { - require := require.New(t) - err := client.Agent().CheckRegister(&api.AgentCheckRegistration{ - Name: "Kubernetes Health Check", - ID: testHealthCheckID, - ServiceID: testServiceNameReg, - Notes: "", - AgentServiceCheck: api.AgentServiceCheck{ - TTL: "100000h", - Status: initialState, - Notes: "", - }, - }) - require.NoError(err) -} - -// We expect to already be pointed at the correct agent. -func getConsulAgentChecks(t *testing.T, client *api.Client, healthCheckID string) *api.AgentCheck { - require := require.New(t) - filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) - checks, err := client.Agent().ChecksWithFilter(filter) - require.NoError(err) - return checks[healthCheckID] -} - -func TestReconcilePod(t *testing.T) { - t.Parallel() - cases := []struct { - Name string - PreCreateHealthCheck bool - InitialState string - Pod *corev1.Pod - Expected *api.AgentCheck - Err string - }{ - { - "inject init container has completed but containers not yet running", - false, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - Phase: corev1.PodPending, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Notes: "", - Output: "Pod is pending", - Type: ttl, - Name: name, - }, - "", - }, - { - "reconcilePod will create check and set passed", - false, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthPassing, - Notes: "", - Output: kubernetesSuccessReasonMsg, - Type: ttl, - Name: name, - }, - "", - }, - { - "reconcilePod will create check and set failed", - false, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Notes: "", - Output: testFailureMessage, - Type: ttl, - Name: name, - }, - "", - }, - { - "precreate a passing pod and change to failed", - true, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Output: testFailureMessage, - Type: ttl, - Name: name, - }, - "", - }, - { - "precreate failed pod and change to passing", - true, - api.HealthCritical, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthPassing, - Output: testCheckNotesPassing, - Type: ttl, - Name: name, - }, - "", - }, - { - "precreate failed check, no pod changes results in no healthcheck changes", - true, - api.HealthCritical, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Output: "", // when there is no change in status, Consul doesnt set the Output field - Type: ttl, - Name: name, - }, - "", - }, - { - "PodRunning no annotations will be ignored for processing", - false, - "", - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - nil, - "", - }, - { - "PodRunning no Ready Status will be ignored for processing", - false, - "", - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - }, - }, - nil, - "", - }, - } - for _, tt := range cases { - t.Run(tt.Name, func(t *testing.T) { - var err error - require := require.New(t) - // Get a server, client, and handler. - server, client, resource := testServerAgentResourceAndController(t, tt.Pod) - defer server.Stop() - server.WaitForLeader(t) - // Register the service with Consul. - server.AddService(t, testServiceNameReg, api.HealthPassing, nil) - if tt.PreCreateHealthCheck { - // Register the health check if this is not an object create path. - registerHealthCheck(t, client, tt.InitialState) - } - // Upsert and Reconcile both use reconcilePod to reconcile a pod. - err = resource.reconcilePod(tt.Pod) - // If we're expecting any error from reconcilePod. - if tt.Err != "" { - // used in the cases where we're expecting an error from - // the controller/handler, in which case do not check agent - // checks as they're not relevant/created. - require.Error(err, tt.Err) - return - } - require.NoError(err) - // Get the agent checks if they were registered. - actual := getConsulAgentChecks(t, client, testHealthCheckID) - - cmpOpts := cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...) - require.True(cmp.Equal(actual, tt.Expected, cmpOpts), - cmp.Diff(actual, tt.Expected, cmpOpts)) - require.True(cmp.Equal(actual, tt.Expected, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) - }) - } -} - -// Test that when we call upsert and the service hasn't been registered -// in Consul yet, we don't return an error. -func TestUpsert_PodWithNoService(t *testing.T) { - t.Parallel() - require := require.New(t) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - } - server, _, resource := testServerAgentResourceAndController(t, pod) - defer server.Stop() - err := resource.Upsert("", pod) - require.Nil(err) -} - -func TestReconcile_IgnorePodsWithoutInjectLabel(t *testing.T) { - t.Parallel() - require := require.New(t) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - } - server, client, resource := testServerAgentResourceAndController(t, pod) - defer server.Stop() - // Start the reconciler, it should not create a health check. - resource.reconcile() - actual := getConsulAgentChecks(t, client, testHealthCheckID) - require.Nil(actual) -} - -// Test pod statuses that the reconciler should ignore. -// These test cases are based on actual observed startup and termination phases. -func TestReconcile_IgnoreStatuses(t *testing.T) { - t.Parallel() - cases := map[string]corev1.PodStatus{ - "not scheduled": { - Phase: corev1.PodPending, - }, - "scheduled and pending": { - Phase: corev1.PodPending, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - }, - }, - "inject init container initializing": { - Phase: corev1.PodPending, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodInitialized, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.ContainersReady, - Status: corev1.ConditionFalse, - }, - }, - InitContainerStatuses: []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "Initializing", - }, - }, - Ready: false, - }, - }, - ContainerStatuses: unreadyAppContainers, - }, - "inject init container running (but not terminated)": { - Phase: corev1.PodPending, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodInitialized, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.ContainersReady, - Status: corev1.ConditionFalse, - }, - }, - InitContainerStatuses: []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Waiting: nil, - Running: &corev1.ContainerStateRunning{StartedAt: metav1.Now()}, - }, - Ready: false, - }, - }, - ContainerStatuses: unreadyAppContainers, - }, - "pod is terminating": { - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodInitialized, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.ContainersReady, - Status: corev1.ConditionFalse, - }, - }, - InitContainerStatuses: []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{StartedAt: metav1.Now()}, - }, - Ready: true, - }, - }, - ContainerStatuses: []corev1.ContainerStatus{ - { - Name: "envoy-sidecar", - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 0, - Reason: "Completed", - StartedAt: metav1.Time{}, - FinishedAt: metav1.Time{}, - }, - }, - Ready: false, - }, - { - Name: "consul-sidecar", - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 2, - Reason: "Error", - StartedAt: metav1.Time{}, - FinishedAt: metav1.Time{}, - }, - }, - Ready: false, - }, - { - Name: "app", - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 137, - Reason: "Error", - StartedAt: metav1.Time{}, - FinishedAt: metav1.Time{}, - }, - }, - Ready: false, - }, - }, - }, - } - for name, podStatus := range cases { - t.Run(name, func(t *testing.T) { - require := require.New(t) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: podStatus, - } - server, _, resource := testServerAgentResourceAndController(t, pod) - defer server.Stop() - - // We would expect an error if the reconciler actually tried to - // register a health check because the underlying service hasn't - // been created. - require.NoError(resource.reconcilePod(pod)) - }) - } -} - -// Test that stopch works for Reconciler. -func TestReconcilerShutdown(t *testing.T) { - t.Parallel() - require := require.New(t) - k8sclientset := fake.NewSimpleClientset() - healthResource := HealthCheckResource{ - Log: hclog.Default().Named("healthCheckResource"), - KubernetesClientset: k8sclientset, - ReconcilePeriod: 5 * time.Second, - } - - reconcilerRunningCtx := make(chan struct{}) - reconcilerShutdownSuccess := make(chan bool) - go func() { - // Starting the reconciler. - healthResource.Run(reconcilerRunningCtx) - close(reconcilerShutdownSuccess) - }() - // Trigger shutdown of the reconciler. - close(reconcilerRunningCtx) - - select { - case <-reconcilerShutdownSuccess: - // The function is expected to exit gracefully so no assertion needed. - return - case <-time.After(time.Second * 1): - // Fail if the stopCh was not caught. - require.Fail("timeout waiting for reconciler to shutdown") - } -} - -// Test that if the agent is unavailable reconcile will fail on the pod -// and once the agent becomes available reconcile will correctly -// update the checks after its loop timer passes. -func TestReconcileRun(t *testing.T) { - t.Parallel() - var err error - require := require.New(t) - - // Start the clientset with a Pod that is failed. - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - } - k8sclientset := fake.NewSimpleClientset(pod) - randomPorts := freeport.MustTake(6) - schema := "http://" - serverAddress := fmt.Sprintf("%s%s:%d", schema, "127.0.0.1", randomPorts[1]) - - // Setup consul client connection. - clientConfig := &api.Config{Address: serverAddress} - require.NoError(err) - client, err := api.NewClient(clientConfig) - require.NoError(err) - consulURL, err := url.Parse(serverAddress) - require.NoError(err) - - healthResource := HealthCheckResource{ - Log: hclog.Default().Named("healthCheckResource"), - KubernetesClientset: k8sclientset, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - ReconcilePeriod: 100 * time.Millisecond, - } - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - - // Start the reconciler. - go func() { - healthResource.Run(ctx.Done()) - }() - // Let reconcile run at least once. - time.Sleep(time.Millisecond * 300) - - var srv *testutil.TestServer - srv, err = testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { - c.Ports = &testutil.TestPortConfig{ - DNS: randomPorts[0], - HTTP: randomPorts[1], - HTTPS: randomPorts[2], - SerfLan: randomPorts[3], - SerfWan: randomPorts[4], - Server: randomPorts[5], - } - }) - require.NoError(err) - // Validate that there is no health check created by reconciler. - check := getConsulAgentChecks(t, client, testHealthCheckID) - require.Nil(check) - // Add the service - only now will a health check have a service to register against. - srv.AddService(t, testServiceNameReg, api.HealthPassing, nil) - - // Retry so we can cover time period when reconciler is already running vs - // when it will run next based on the loop. - timer := &retry.Timer{Timeout: 5 * time.Second, Wait: 1 * time.Second} - var actual *api.AgentCheck - retry.RunWith(timer, t, func(r *retry.R) { - actual = getConsulAgentChecks(t, client, testHealthCheckID) - // The assertion is not on actual != nil, but below - // against an expected check. - if actual == nil || actual.Output == "" { - r.Error("check = nil") - } - }) - - expectedCheck := &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Output: testFailureMessage, - Type: ttl, - Name: name, - } - // Validate the checks are set. - require.True(cmp.Equal(actual, expectedCheck, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) -} - -func testServerAgentResourceAndController(t *testing.T, pod *corev1.Pod) (*testutil.TestServer, *api.Client, *HealthCheckResource) { - return testServerAgentResourceAndControllerWithConsulNS(t, pod, "") -} - -func testServerAgentResourceAndControllerWithConsulNS(t *testing.T, pod *corev1.Pod, consulNS string) (*testutil.TestServer, *api.Client, *HealthCheckResource) { - require := require.New(t) - // Setup server & client. - s, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(err) - - clientConfig := &api.Config{Address: s.HTTPAddr, Namespace: consulNS} - client, err := api.NewClient(clientConfig) - require.NoError(err) - - schema := "http://" - consulURL, err := url.Parse(schema + s.HTTPAddr) - require.NoError(err) - - healthResource := HealthCheckResource{ - Log: hclog.Default().Named("healthCheckResource"), - KubernetesClientset: fake.NewSimpleClientset(pod), - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - ReconcilePeriod: 0, - } - return s, client, &healthResource -} - -// unreadyAppContainers are the container statuses of an example connect pod's -// non-init containers when init containers are still running. -var unreadyAppContainers = []corev1.ContainerStatus{ - { - Name: "envoy-sidecar", - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "PodInitializing", - }, - }, - Ready: false, - }, - { - Name: "consul-sidecar", - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "PodInitializing", - }, - }, - Ready: false, - }, - { - Name: "app", - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "PodInitializing", - }, - }, - Ready: false, - }, -} diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 9610fce4ac..5228273d63 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -8,18 +8,14 @@ import ( "io/ioutil" "net/http" "net/url" - "os" - "os/signal" "strconv" "strings" "sync" "sync/atomic" - "syscall" "time" connectinject "github.com/hashicorp/consul-k8s/connect-inject" "github.com/hashicorp/consul-k8s/consul" - "github.com/hashicorp/consul-k8s/helper/controller" "github.com/hashicorp/consul-k8s/subcommand/common" "github.com/hashicorp/consul-k8s/subcommand/flags" "github.com/hashicorp/consul/api" @@ -107,10 +103,9 @@ type Command struct { consulClient *api.Client clientset kubernetes.Interface - sigCh chan os.Signal - once sync.Once - help string - cert atomic.Value + once sync.Once + help string + cert atomic.Value } var ( @@ -208,12 +203,6 @@ func (c *Command) init() { flags.Merge(c.flagSet, flag.CommandLine) c.help = flags.Usage(help, c.flagSet) - // Wait on an interrupt or terminate for exit, be sure to init it before running - // the controller so that we don't receive an interrupt before it's ready. - if c.sigCh == nil { - c.sigCh = make(chan os.Signal, 1) - signal.Notify(c.sigCh, syscall.SIGINT, syscall.SIGTERM) - } } func (c *Command) Run(args []string) int { @@ -373,180 +362,97 @@ func (c *Command) Run(args []string) int { allowK8sNamespaces := flags.ToSet(c.flagAllowK8sNamespacesList) denyK8sNamespaces := flags.ToSet(c.flagDenyK8sNamespacesList) - // Start the cleanup controller that cleans up Consul service instances - // still registered after the pod has been deleted (usually due to a force delete). - ctrlExitCh := make(chan error) - // Start the endpoints controller - { - zapLogger := zap.New(zap.UseDevMode(true), zap.Level(zapcore.InfoLevel)) - ctrl.SetLogger(zapLogger) - klog.SetLogger(zapLogger) - listenSplits := strings.SplitN(c.flagListen, ":", 2) - if len(listenSplits) < 2 { - c.UI.Error(fmt.Sprintf("missing port in address: %s", c.flagListen)) - return 1 - } - port, err := strconv.Atoi(listenSplits[1]) - if err != nil { - c.UI.Error(fmt.Sprintf("unable to parse port string: %s", err)) - return 1 - } - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - LeaderElection: false, - Host: listenSplits[0], - Port: port, - Logger: zapLogger, - MetricsBindAddress: "0.0.0.0:9444", - }) - if err != nil { - setupLog.Error(err, "unable to start manager") - return 1 - } - - metricsConfig := connectinject.MetricsConfig{ - DefaultEnableMetrics: c.flagDefaultEnableMetrics, - DefaultEnableMetricsMerging: c.flagDefaultEnableMetricsMerging, - DefaultMergedMetricsPort: c.flagDefaultMergedMetricsPort, - DefaultPrometheusScrapePort: c.flagDefaultPrometheusScrapePort, - DefaultPrometheusScrapePath: c.flagDefaultPrometheusScrapePath, - } - - if err = (&connectinject.EndpointsController{ - Client: mgr.GetClient(), - ConsulClient: c.consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - AllowK8sNamespacesSet: allowK8sNamespaces, - DenyK8sNamespacesSet: denyK8sNamespaces, - Log: ctrl.Log.WithName("controller").WithName("endpoints-controller"), - Scheme: mgr.GetScheme(), - ReleaseName: c.flagReleaseName, - ReleaseNamespace: c.flagReleaseNamespace, - MetricsConfig: metricsConfig, - Context: ctx, - ConsulClientCfg: cfg, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) - return 1 - } + zapLogger := zap.New(zap.UseDevMode(true), zap.Level(zapcore.InfoLevel)) + ctrl.SetLogger(zapLogger) + klog.SetLogger(zapLogger) + listenSplits := strings.SplitN(c.flagListen, ":", 2) + if len(listenSplits) < 2 { + c.UI.Error(fmt.Sprintf("missing port in address: %s", c.flagListen)) + return 1 + } + port, err := strconv.Atoi(listenSplits[1]) + if err != nil { + c.UI.Error(fmt.Sprintf("unable to parse port string: %s", err)) + return 1 + } - mgr.GetWebhookServer().CertDir = c.flagCertDir - - mgr.GetWebhookServer().Register("/mutate", - &webhook.Admission{Handler: &connectinject.Handler{ - ConsulClient: c.consulClient, - ImageConsul: c.flagConsulImage, - ImageEnvoy: c.flagEnvoyImage, - EnvoyExtraArgs: c.flagEnvoyExtraArgs, - ImageConsulK8S: c.flagConsulK8sImage, - RequireAnnotation: !c.flagDefaultInject, - AuthMethod: c.flagACLAuthMethod, - ConsulCACert: string(consulCACert), - DefaultProxyCPURequest: sidecarProxyCPURequest, - DefaultProxyCPULimit: sidecarProxyCPULimit, - DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, - DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, - InitContainerResources: initResources, - ConsulSidecarResources: consulSidecarResources, - EnableNamespaces: c.flagEnableNamespaces, - AllowK8sNamespacesSet: allowK8sNamespaces, - DenyK8sNamespacesSet: denyK8sNamespaces, - ConsulDestinationNamespace: c.flagConsulDestinationNamespace, - EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, - K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, - CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, - MetricsConfig: metricsConfig, - Log: logger.Named("handler"), - }}) - - // todo: Add tests in case it's not refactored to not have any signal handling - // (In the future, we plan to only have the manager and rely on it to do signal handling for us). - go func() { - // Pass existing context's done channel so that the controller - // will stop when this context is canceled. - // This could be due to an interrupt signal or if any other component did not start - // successfully. In those cases, we want to make sure that this controller is no longer - // running. - if err := mgr.Start(ctx); err != nil { - setupLog.Error(err, "problem running manager") - // Use an existing channel for ctrl exists in case manager failed to start properly. - ctrlExitCh <- fmt.Errorf("endpoints controller exited unexpectedly") - } - }() - } - - if c.flagEnableCleanupController { - cleanupResource := connectinject.CleanupResource{ - Log: logger.Named("cleanupResource"), - KubernetesClient: c.clientset, - Ctx: ctx, - ReconcilePeriod: c.flagCleanupControllerReconcilePeriod, - ConsulClient: c.consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - EnableConsulNamespaces: c.flagEnableNamespaces, - } - cleanupCtrl := &controller.Controller{ - Log: logger.Named("cleanupController"), - Resource: &cleanupResource, - } - go func() { - cleanupCtrl.Run(ctx.Done()) - if ctx.Err() == nil { - ctrlExitCh <- fmt.Errorf("cleanup controller exited unexpectedly") - } - }() - } - - if c.flagEnableHealthChecks { - healthResource := connectinject.HealthCheckResource{ - Log: logger.Named("healthCheckResource"), - KubernetesClientset: c.clientset, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - Ctx: ctx, - ReconcilePeriod: c.flagHealthChecksReconcilePeriod, - } + metricsConfig := connectinject.MetricsConfig{ + DefaultEnableMetrics: c.flagDefaultEnableMetrics, + DefaultEnableMetricsMerging: c.flagDefaultEnableMetricsMerging, + DefaultMergedMetricsPort: c.flagDefaultMergedMetricsPort, + DefaultPrometheusScrapePort: c.flagDefaultPrometheusScrapePort, + DefaultPrometheusScrapePath: c.flagDefaultPrometheusScrapePath, + } - healthChecksCtrl := &controller.Controller{ - Log: logger.Named("healthCheckController"), - Resource: &healthResource, - } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + LeaderElection: false, + Host: listenSplits[0], + Port: port, + Logger: zapLogger, + MetricsBindAddress: "0.0.0.0:9444", + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + return 1 + } - // Start the health check controller, reconcile is started at the same time - // and new events will queue in the informer. - go func() { - healthChecksCtrl.Run(ctx.Done()) - // If ctl.Run() exits before ctx is cancelled, then our health checks - // controller isn't running. In that case we need to shutdown since - // this is unrecoverable. - if ctx.Err() == nil { - ctrlExitCh <- fmt.Errorf("health checks controller exited unexpectedly") - } - }() - } - - // Block until we get a signal or something errors. - select { - case sig := <-c.sigCh: - c.UI.Info(fmt.Sprintf("%s received, shutting down", sig)) - return 0 - - case err := <-ctrlExitCh: - c.UI.Error(fmt.Sprintf("controller error: %v", err)) + if err = (&connectinject.EndpointsController{ + Client: mgr.GetClient(), + ConsulClient: c.consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + Log: ctrl.Log.WithName("controller").WithName("endpoints-controller"), + Scheme: mgr.GetScheme(), + ReleaseName: c.flagReleaseName, + ReleaseNamespace: c.flagReleaseNamespace, + MetricsConfig: metricsConfig, + Context: ctx, + ConsulClientCfg: cfg, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) return 1 } -} -func (c *Command) interrupt() { - c.sendSignal(syscall.SIGINT) -} + mgr.GetWebhookServer().CertDir = c.flagCertDir + + mgr.GetWebhookServer().Register("/mutate", + &webhook.Admission{Handler: &connectinject.Handler{ + ConsulClient: c.consulClient, + ImageConsul: c.flagConsulImage, + ImageEnvoy: c.flagEnvoyImage, + EnvoyExtraArgs: c.flagEnvoyExtraArgs, + ImageConsulK8S: c.flagConsulK8sImage, + RequireAnnotation: !c.flagDefaultInject, + AuthMethod: c.flagACLAuthMethod, + ConsulCACert: string(consulCACert), + DefaultProxyCPURequest: sidecarProxyCPURequest, + DefaultProxyCPULimit: sidecarProxyCPULimit, + DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, + DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, + InitContainerResources: initResources, + ConsulSidecarResources: consulSidecarResources, + EnableNamespaces: c.flagEnableNamespaces, + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, + K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, + CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, + MetricsConfig: metricsConfig, + Log: logger.Named("handler"), + }}) + + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "problem running manager") + return 1 + } + c.UI.Info("shutting down") + return 0 -func (c *Command) sendSignal(sig os.Signal) { - c.sigCh <- sig } func (c *Command) handleReady(rw http.ResponseWriter, req *http.Request) { diff --git a/subcommand/inject-connect/command_test.go b/subcommand/inject-connect/command_test.go index dc9c0a0e3c..b2fa324cf4 100644 --- a/subcommand/inject-connect/command_test.go +++ b/subcommand/inject-connect/command_test.go @@ -1,14 +1,10 @@ package connectinject import ( - "fmt" "os" - "syscall" "testing" - "time" "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" "k8s.io/client-go/kubernetes/fake" @@ -218,64 +214,3 @@ func TestRun_ValidationConsulHTTPAddr(t *testing.T) { require.Equal(t, 1, code) require.Contains(t, ui.ErrorWriter.String(), "error parsing consul address \"http://%\": parse \"http://%\": invalid URL escape \"%") } - -// Test that when healthchecks are enabled that SIGINT/SIGTERM exits the -// command cleanly. -func TestRun_CommandExitsCleanlyAfterSignal(t *testing.T) { - // TODO: fix this skip - t.Skip("This test will be rewritten when the manager handles all signal handling") - t.Run("SIGINT", testSignalHandling(syscall.SIGINT)) - t.Run("SIGTERM", testSignalHandling(syscall.SIGTERM)) -} - -func testSignalHandling(sig os.Signal) func(*testing.T) { - return func(t *testing.T) { - k8sClient := fake.NewSimpleClientset() - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8sClient, - } - ports := freeport.MustTake(1) - - // NOTE: This url doesn't matter because Consul is never called. - os.Setenv(api.HTTPAddrEnvName, "http://0.0.0.0:9999") - defer os.Unsetenv(api.HTTPAddrEnvName) - - // Start the command asynchronously and then we'll send an interrupt. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true", - "-listen", fmt.Sprintf(":%d", ports[0]), - }) - - // Send the signal - cmd.sendSignal(sig) - - // Assert that it exits cleanly or timeout. - select { - case exitCode := <-exitChan: - require.Equal(t, 0, exitCode, ui.ErrorWriter.String()) - case <-time.After(time.Second * 2): - // Fail if the stopCh was not caught. - require.Fail(t, "timeout waiting for command to exit") - } - } -} - -// This function starts the command asynchronously and returns a non-blocking chan. -// When finished, the command will send its exit code to the channel. -// Note that it's the responsibility of the caller to terminate the command by calling stopCommand, -// otherwise it can run forever. -func runCommandAsynchronously(cmd *Command, args []string) chan int { - // We have to run cmd.init() to ensure that the channel the command is - // using to watch for os interrupts is initialized. If we don't do this, - // then if stopCommand is called immediately, it will block forever - // because it calls interrupt() which will attempt to send on a nil channel. - cmd.init() - exitChan := make(chan int, 1) - go func() { - exitChan <- cmd.Run(args) - }() - return exitChan -}