diff --git a/catalog/to-consul/resource.go b/catalog/to-consul/resource.go index 921b130ab7..77de73f5b4 100644 --- a/catalog/to-consul/resource.go +++ b/catalog/to-consul/resource.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-k8s/helper/controller" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" @@ -78,6 +78,9 @@ type ServiceResource struct { // Setting this to false will ignore ClusterIP services during the sync. ClusterIPSync bool + // LoadBalancerEndpointsSync set to true (default false) will sync ServiceTypeLoadBalancer endpoints. + LoadBalancerEndpointsSync bool + // NodeExternalIPSync set to true (the default) syncs NodePort services // using the node's external ip address. When false, the node's internal // ip address will be used instead. @@ -295,7 +298,9 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool { return false } - return svc.Spec.Type == apiv1.ServiceTypeNodePort || svc.Spec.Type == apiv1.ServiceTypeClusterIP + return svc.Spec.Type == apiv1.ServiceTypeNodePort || + svc.Spec.Type == apiv1.ServiceTypeClusterIP || + (t.LoadBalancerEndpointsSync && svc.Spec.Type == apiv1.ServiceTypeLoadBalancer) } // generateRegistrations generates the necessary Consul registrations for @@ -468,28 +473,34 @@ func (t *ServiceResource) generateRegistrations(key string) { // For LoadBalancer type services, we create a service instance for // each LoadBalancer entry. We only support entries that have an IP // address assigned (not hostnames). + // If LoadBalancerEndpointsSync is true sync LB endpoints instead of loadbalancer ingress. case apiv1.ServiceTypeLoadBalancer: - seen := map[string]struct{}{} - for _, ingress := range svc.Status.LoadBalancer.Ingress { - addr := ingress.IP - if addr == "" { - addr = ingress.Hostname - } - if addr == "" { - continue - } + if t.LoadBalancerEndpointsSync { + t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, false) + } else { + seen := map[string]struct{}{} + for _, ingress := range svc.Status.LoadBalancer.Ingress { + addr := ingress.IP + if addr == "" { + addr = ingress.Hostname + } + if addr == "" { + continue + } - if _, ok := seen[addr]; ok { - continue - } - seen[addr] = struct{}{} + if _, ok := seen[addr]; ok { + continue + } + seen[addr] = struct{}{} - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, addr) - r.Service.Address = addr - t.consulMap[key] = append(t.consulMap[key], &r) + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, addr) + r.Service.Address = addr + + t.consulMap[key] = append(t.consulMap[key], &r) + } } // For NodePort services, we create a service instance for each @@ -566,63 +577,74 @@ func (t *ServiceResource) generateRegistrations(key string) { // For ClusterIP services, we register a service instance // for each endpoint. case apiv1.ServiceTypeClusterIP: - if t.endpointsMap == nil { - return - } + t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, true) + } +} - endpoints := t.endpointsMap[key] - if endpoints == nil { - return - } +func (t *ServiceResource) registerServiceInstance( + baseNode consulapi.CatalogRegistration, + baseService consulapi.AgentService, + key string, + overridePortName string, + overridePortNumber int, + useHostname bool) { - seen := map[string]struct{}{} - for _, subset := range endpoints.Subsets { - // For ClusterIP services, we use the endpoint port instead - // of the service port because we're registering each endpoint - // as a separate service instance. - epPort := baseService.Port - if overridePortName != "" { - // If we're supposed to use a specific named port, find it. - for _, p := range subset.Ports { - if overridePortName == p.Name { - epPort = int(p.Port) - break - } - } - } else if overridePortNumber == 0 { - // Otherwise we'll just use the first port in the list - // (unless the port number was overridden by an annotation). - for _, p := range subset.Ports { + if t.endpointsMap == nil { + return + } + + endpoints := t.endpointsMap[key] + if endpoints == nil { + return + } + + seen := map[string]struct{}{} + for _, subset := range endpoints.Subsets { + // For ClusterIP services and if LoadBalancerEndpointsSync is true, we use the endpoint port instead + // of the service port because we're registering each endpoint + // as a separate service instance. + epPort := baseService.Port + if overridePortName != "" { + // If we're supposed to use a specific named port, find it. + for _, p := range subset.Ports { + if overridePortName == p.Name { epPort = int(p.Port) break } } - for _, subsetAddr := range subset.Addresses { - addr := subsetAddr.IP - if addr == "" { - addr = subsetAddr.Hostname - } - if addr == "" { - continue - } + } else if overridePortNumber == 0 { + // Otherwise we'll just use the first port in the list + // (unless the port number was overridden by an annotation). + for _, p := range subset.Ports { + epPort = int(p.Port) + break + } + } + for _, subsetAddr := range subset.Addresses { + addr := subsetAddr.IP + if addr == "" && useHostname { + addr = subsetAddr.Hostname + } + if addr == "" { + continue + } - // Its not clear whether K8S guarantees ready addresses to - // be unique so we maintain a set to prevent duplicates just - // in case. - if _, ok := seen[addr]; ok { - continue - } - seen[addr] = struct{}{} + // Its not clear whether K8S guarantees ready addresses to + // be unique so we maintain a set to prevent duplicates just + // in case. + if _, ok := seen[addr]; ok { + continue + } + seen[addr] = struct{}{} - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, addr) - r.Service.Address = addr - r.Service.Port = epPort + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, addr) + r.Service.Address = addr + r.Service.Port = epPort - t.consulMap[key] = append(t.consulMap[key], &r) - } + t.consulMap[key] = append(t.consulMap[key], &r) } } } diff --git a/catalog/to-consul/resource_test.go b/catalog/to-consul/resource_test.go index 1d86f844e1..96a063dc38 100644 --- a/catalog/to-consul/resource_test.go +++ b/catalog/to-consul/resource_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-k8s/helper/controller" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" @@ -589,6 +589,58 @@ func TestServiceResource_lbAnnotatedMeta(t *testing.T) { require.Equal("bar", actual[0].Service.Meta["foo"]) } +// Test that with LoadBalancerEndpointsSync set to true we track the IP of the endpoints not the LB IP/name +func TestServiceResource_lbRegisterEndpoints(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + serviceResource := defaultServiceResource(client, syncer) + serviceResource.LoadBalancerEndpointsSync = true + + // Start the controller + closer := controller.TestControllerRun(&serviceResource) + defer closer() + + node1, _ := createNodes(t, client) + + // Insert the endpoints + _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + { + Addresses: []apiv1.EndpointAddress{ + {NodeName: &node1.Name, IP: "8.8.8.8"}, + }, + Ports: []apiv1.EndpointPort{ + {Name: "http", Port: 8080}, + {Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Insert an LB service + svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4") + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(svc) + require.NoError(err) + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 1) + require.Equal("foo", actual[0].Service.Service) + require.Equal("8.8.8.8", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) + require.Equal("k8s-sync", actual[0].Node) +} + // Test that the proper registrations are generated for a NodePort type. func TestServiceResource_nodePort(t *testing.T) { t.Parallel() diff --git a/subcommand/sync-catalog/command.go b/subcommand/sync-catalog/command.go index b9738f2145..af844b7b53 100644 --- a/subcommand/sync-catalog/command.go +++ b/subcommand/sync-catalog/command.go @@ -45,6 +45,7 @@ type Command struct { flagK8SWriteNamespace string flagConsulWritePeriod flags.DurationValue flagSyncClusterIPServices bool + flagSyncLBEndpoints bool flagNodePortSyncType string flagAddK8SNamespaceSuffix bool flagLogLevel string @@ -102,6 +103,9 @@ func (c *Command) init() { c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true, "If true, all valid ClusterIP services in K8S are synced by default. If false, "+ "ClusterIP services are not synced to Consul.") + c.flags.BoolVar(&c.flagSyncLBEndpoints, "sync-lb-services-endpoints", false, + "If true, LoadBalancer service endpoints instead of ingress addresses will be synced to Consul. If false, "+ + "LoadBalancer endpoints are not synced to Consul.") c.flags.StringVar(&c.flagNodePortSyncType, "node-port-sync-type", "ExternalOnly", "Defines the type of sync for NodePort services. Valid options are ExternalOnly, "+ "InternalOnly and ExternalFirst.") @@ -260,6 +264,7 @@ func (c *Command) Run(args []string) int { DenyK8sNamespacesSet: denySet, ExplicitEnable: !c.flagK8SDefault, ClusterIPSync: c.flagSyncClusterIPServices, + LoadBalancerEndpointsSync: c.flagSyncLBEndpoints, NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType), ConsulK8STag: c.flagConsulK8STag, ConsulServicePrefix: c.flagConsulServicePrefix,