Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync LB endpoints #257

Merged
merged 11 commits into from
Jun 5, 2020
160 changes: 91 additions & 69 deletions catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"sync"

"github.com/deckarep/golang-set"
mapset "github.com/deckarep/golang-set"
ltagliamonte-dd marked this conversation as resolved.
Show resolved Hide resolved
"github.com/hashicorp/consul-k8s/helper/controller"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -78,6 +78,9 @@ type ServiceResource struct {
// Setting this to false will ignore ClusterIP services during the sync.
ClusterIPSync bool

// LoadBalancerEndpointsSync set to true (default false) will sync ServiceTypeLoadBalancer endpoints.
LoadBalancerEndpointsSync bool

// NodeExternalIPSync set to true (the default) syncs NodePort services
// using the node's external ip address. When false, the node's internal
// ip address will be used instead.
Expand Down Expand Up @@ -295,7 +298,9 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool {
return false
}

return svc.Spec.Type == apiv1.ServiceTypeNodePort || svc.Spec.Type == apiv1.ServiceTypeClusterIP
return svc.Spec.Type == apiv1.ServiceTypeNodePort ||
svc.Spec.Type == apiv1.ServiceTypeClusterIP ||
(t.LoadBalancerEndpointsSync && svc.Spec.Type == apiv1.ServiceTypeLoadBalancer)
}

// generateRegistrations generates the necessary Consul registrations for
Expand Down Expand Up @@ -468,28 +473,34 @@ func (t *ServiceResource) generateRegistrations(key string) {
// For LoadBalancer type services, we create a service instance for
// each LoadBalancer entry. We only support entries that have an IP
// address assigned (not hostnames).
// If LoadBalancerEndpointsSync is true sync LB endpoints instead of loadbalancer ingress.
case apiv1.ServiceTypeLoadBalancer:
seen := map[string]struct{}{}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
}
if addr == "" {
continue
}
if t.LoadBalancerEndpointsSync {
t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, false)
} else {
seen := map[string]struct{}{}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
}
if addr == "" {
continue
}

if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
t.consulMap[key] = append(t.consulMap[key], &r)
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

t.consulMap[key] = append(t.consulMap[key], &r)
}
}

// For NodePort services, we create a service instance for each
Expand Down Expand Up @@ -566,63 +577,74 @@ func (t *ServiceResource) generateRegistrations(key string) {
// For ClusterIP services, we register a service instance
// for each endpoint.
case apiv1.ServiceTypeClusterIP:
if t.endpointsMap == nil {
return
}
t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, true)
}
}

endpoints := t.endpointsMap[key]
if endpoints == nil {
return
}
func (t *ServiceResource) registerServiceInstance(
baseNode consulapi.CatalogRegistration,
baseService consulapi.AgentService,
key string,
overridePortName string,
overridePortNumber int,
useHostname bool) {

seen := map[string]struct{}{}
for _, subset := range endpoints.Subsets {
// For ClusterIP services, we use the endpoint port instead
// of the service port because we're registering each endpoint
// as a separate service instance.
epPort := baseService.Port
if overridePortName != "" {
// If we're supposed to use a specific named port, find it.
for _, p := range subset.Ports {
if overridePortName == p.Name {
epPort = int(p.Port)
break
}
}
} else if overridePortNumber == 0 {
// Otherwise we'll just use the first port in the list
// (unless the port number was overridden by an annotation).
for _, p := range subset.Ports {
if t.endpointsMap == nil {
return
}

endpoints := t.endpointsMap[key]
if endpoints == nil {
return
}

seen := map[string]struct{}{}
for _, subset := range endpoints.Subsets {
// For ClusterIP services and if LoadBalancerEndpointsSync is true, we use the endpoint port instead
// of the service port because we're registering each endpoint
// as a separate service instance.
epPort := baseService.Port
if overridePortName != "" {
// If we're supposed to use a specific named port, find it.
for _, p := range subset.Ports {
if overridePortName == p.Name {
epPort = int(p.Port)
break
}
}
for _, subsetAddr := range subset.Addresses {
addr := subsetAddr.IP
if addr == "" {
addr = subsetAddr.Hostname
}
if addr == "" {
continue
}
} else if overridePortNumber == 0 {
// Otherwise we'll just use the first port in the list
// (unless the port number was overridden by an annotation).
for _, p := range subset.Ports {
epPort = int(p.Port)
break
}
}
for _, subsetAddr := range subset.Addresses {
addr := subsetAddr.IP
if addr == "" && useHostname {
addr = subsetAddr.Hostname
}
if addr == "" {
continue
}

// Its not clear whether K8S guarantees ready addresses to
// be unique so we maintain a set to prevent duplicates just
// in case.
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}
// Its not clear whether K8S guarantees ready addresses to
// be unique so we maintain a set to prevent duplicates just
// in case.
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
r.Service.Port = epPort
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
r.Service.Port = epPort

t.consulMap[key] = append(t.consulMap[key], &r)
}
t.consulMap[key] = append(t.consulMap[key], &r)
}
}
}
Expand Down
54 changes: 53 additions & 1 deletion catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"
"time"

"github.com/deckarep/golang-set"
mapset "github.com/deckarep/golang-set"
"github.com/hashicorp/consul-k8s/helper/controller"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -589,6 +589,58 @@ func TestServiceResource_lbAnnotatedMeta(t *testing.T) {
require.Equal("bar", actual[0].Service.Meta["foo"])
}

// Test that with LoadBalancerEndpointsSync set to true we track the IP of the endpoints not the LB IP/name
func TestServiceResource_lbRegisterEndpoints(t *testing.T) {
t.Parallel()
require := require.New(t)
client := fake.NewSimpleClientset()
syncer := &TestSyncer{}
serviceResource := defaultServiceResource(client, syncer)
serviceResource.LoadBalancerEndpointsSync = true

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

node1, _ := createNodes(t, client)

// Insert the endpoints
_, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Subsets: []apiv1.EndpointSubset{
{
Addresses: []apiv1.EndpointAddress{
{NodeName: &node1.Name, IP: "8.8.8.8"},
},
Ports: []apiv1.EndpointPort{
{Name: "http", Port: 8080},
{Name: "rpc", Port: 2000},
},
},
},
})
require.NoError(err)

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(svc)
require.NoError(err)
time.Sleep(300 * time.Millisecond)

// Verify what we got
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(actual, 1)
require.Equal("foo", actual[0].Service.Service)
require.Equal("8.8.8.8", actual[0].Service.Address)
require.Equal(8080, actual[0].Service.Port)
require.Equal("k8s-sync", actual[0].Node)
}

// Test that the proper registrations are generated for a NodePort type.
func TestServiceResource_nodePort(t *testing.T) {
t.Parallel()
Expand Down
5 changes: 5 additions & 0 deletions subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Command struct {
flagK8SWriteNamespace string
flagConsulWritePeriod flags.DurationValue
flagSyncClusterIPServices bool
flagSyncLBEndpoints bool
flagNodePortSyncType string
flagAddK8SNamespaceSuffix bool
flagLogLevel string
Expand Down Expand Up @@ -102,6 +103,9 @@ func (c *Command) init() {
c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true,
"If true, all valid ClusterIP services in K8S are synced by default. If false, "+
"ClusterIP services are not synced to Consul.")
c.flags.BoolVar(&c.flagSyncLBEndpoints, "sync-lb-services-endpoints", false,
"If true, LoadBalancer service endpoints instead of ingress addresses will be synced to Consul. If false, "+
"LoadBalancer endpoints are not synced to Consul.")
c.flags.StringVar(&c.flagNodePortSyncType, "node-port-sync-type", "ExternalOnly",
"Defines the type of sync for NodePort services. Valid options are ExternalOnly, "+
"InternalOnly and ExternalFirst.")
Expand Down Expand Up @@ -260,6 +264,7 @@ func (c *Command) Run(args []string) int {
DenyK8sNamespacesSet: denySet,
ExplicitEnable: !c.flagK8SDefault,
ClusterIPSync: c.flagSyncClusterIPServices,
LoadBalancerEndpointsSync: c.flagSyncLBEndpoints,
NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType),
ConsulK8STag: c.flagConsulK8STag,
ConsulServicePrefix: c.flagConsulServicePrefix,
Expand Down