From ada2adc623df427fb6bdb631886ca3f2c8cf3ac1 Mon Sep 17 00:00:00 2001 From: JP Phillips Date: Thu, 18 Oct 2018 10:15:49 -0500 Subject: [PATCH 1/4] catalog/from-k8s: support ClusterIP Services, fixes NodePort port --- catalog/from-k8s/resource.go | 22 +- catalog/from-k8s/resource_test.go | 372 +++++++++++++++++++++++++++++- 2 files changed, 380 insertions(+), 14 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 18e18dad76dc..e14df71dbd82 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -193,7 +193,11 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool { return false } - return svc.Spec.Type == apiv1.ServiceTypeNodePort + switch svc.Spec.Type { + case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP: + return true + } + return false } // generateRegistrations generates the necessary Consul registrations for @@ -243,10 +247,10 @@ func (t *ServiceResource) generateRegistrations(key string) { baseService.Service = strings.TrimSpace(v) } + var main string // Determine the default port if len(svc.Spec.Ports) > 0 { - nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort - main := svc.Spec.Ports[0].Name + main = svc.Spec.Ports[0].Name // If a specific port is specified, then use that port value if target, ok := svc.Annotations[annotationServicePort]; ok { @@ -260,9 +264,6 @@ func (t *ServiceResource) generateRegistrations(key string) { // also use this opportunity to find our default port. for _, p := range svc.Spec.Ports { target := p.Port - if nodePort && p.NodePort > 0 { - target = p.NodePort - } // Set the tag baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(target), 10) @@ -343,7 +344,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // For NodePort services, we create a service instance for each // endpoint of the service. This way we don't register _every_ K8S // node as part of the service. - case apiv1.ServiceTypeNodePort: + case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP: if t.endpointsMap == nil { return } @@ -372,6 +373,13 @@ func (t *ServiceResource) generateRegistrations(key string) { } seen[addr] = struct{}{} + for _, port := range subset.Ports { + if port.Name == main { + baseService.Port = int(port.Port) + break + } + } + r := baseNode rs := baseService r.Service = &rs diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index a6e95fe1ff3a..eed05724117d 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -1,9 +1,11 @@ package catalog import ( + "io/ioutil" "testing" "time" + "github.com/Sirupsen/logrus" "github.com/hashicorp/consul-k8s/helper/controller" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" @@ -14,6 +16,7 @@ import ( ) func init() { + logrus.SetOutput(ioutil.Discard) hclog.DefaultOptions.Level = hclog.Debug } @@ -538,8 +541,8 @@ func TestServiceResource_nodePort(t *testing.T) { Spec: apiv1.ServiceSpec{ Type: apiv1.ServiceTypeNodePort, Ports: []apiv1.ServicePort{ - apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)}, - apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)}, + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, }, }, }) @@ -548,6 +551,8 @@ func TestServiceResource_nodePort(t *testing.T) { // Wait a bit time.Sleep(300 * time.Millisecond) + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" // Insert the endpoints _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -557,13 +562,21 @@ func TestServiceResource_nodePort(t *testing.T) { Subsets: []apiv1.EndpointSubset{ apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{IP: "1.2.3.4"}, + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, }, }, apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{IP: "2.3.4.5"}, + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, }, }, }, @@ -580,8 +593,12 @@ func TestServiceResource_nodePort(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(8080, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -602,6 +619,8 @@ func TestServiceResource_nodePortInitial(t *testing.T) { defer closer() time.Sleep(100 * time.Millisecond) + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" // Insert the endpoints _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -611,13 +630,21 @@ func TestServiceResource_nodePortInitial(t *testing.T) { Subsets: []apiv1.EndpointSubset{ apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{IP: "1.2.3.4"}, + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, }, }, apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{IP: "2.3.4.5"}, + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, }, }, }, @@ -633,6 +660,219 @@ func TestServiceResource_nodePortInitial(t *testing.T) { Spec: apiv1.ServiceSpec{ Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(400 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(8080, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) +} + +// Test that the proper registrations are generated for a NodePort with annotated port. +func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{annotationServicePort: "rpc"}, + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(2000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(2000, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a ClusterIP type. +func TestServiceResource_clusterIP(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(8080, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a ClusterIP type with multiple ports. +func TestServiceResource_clusterIPMultiEndpoint(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, Ports: []apiv1.ServicePort{ apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)}, apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)}, @@ -642,7 +882,40 @@ func TestServiceResource_nodePortInitial(t *testing.T) { require.NoError(err) // Wait a bit - time.Sleep(400 * time.Millisecond) + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) // Verify what we got syncer.Lock() @@ -651,8 +924,93 @@ func TestServiceResource_nodePortInitial(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(8080, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a ClusterIP type with annotated override. +func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{annotationServicePort: "rpc"}, + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(2000, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(2000, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } // testService returns a service that will result in a registration. From 6443fae723b027f9a13f41ff49e30de3da8bae7c Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Thu, 6 Dec 2018 18:50:15 -0800 Subject: [PATCH 2/4] Make clusterip syncing optional Adds a command line flag to `consul-k8s` that allows the user to enable or disable the syncing of ClusterIP services. Setting the flag to `false` keeps the current behavior, while the default is `true`, allowing the services to be synced. Updates the ClusterIP functionality tests, including adding a test with the functionality disabled. Removes an added log dependency within the tests. --- catalog/from-k8s/resource.go | 17 +++-- catalog/from-k8s/resource_test.go | 99 ++++++++++++++++++++++++++---- subcommand/sync-catalog/command.go | 27 ++++---- 3 files changed, 114 insertions(+), 29 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index e14df71dbd82..a23d4c9b977e 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -43,6 +43,10 @@ type ServiceResource struct { // enabled (aka default enabled). ExplicitEnable bool + // ClusterIPSync set to true (the default) syncs ClusterIP-type services. + // Setting this to false will ignore ClusterIP services during the sync. + ClusterIPSync bool + // serviceMap is a mapping of unique key (given by controller) to // the service structure. endpointsMap is the mapping of the same // uniqueKey to a set of endpoints. @@ -157,6 +161,13 @@ func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool { return false } + // Ignore ClusterIP services if ClusterIP sync is disabled + if svc.Spec.Type == apiv1.ServiceTypeClusterIP { + if !t.ClusterIPSync { + return false + } + } + raw, ok := svc.Annotations[annotationServiceSync] if !ok { // If there is no explicit value, then set it to our current default. @@ -193,11 +204,7 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool { return false } - switch svc.Spec.Type { - case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP: - return true - } - return false + return svc.Spec.Type == apiv1.ServiceTypeNodePort || svc.Spec.Type == apiv1.ServiceTypeClusterIP } // generateRegistrations generates the necessary Consul registrations for diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index eed05724117d..ffe29f4ed07d 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -1,11 +1,9 @@ package catalog import ( - "io/ioutil" "testing" "time" - "github.com/Sirupsen/logrus" "github.com/hashicorp/consul-k8s/helper/controller" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" @@ -16,7 +14,6 @@ import ( ) func init() { - logrus.SetOutput(ioutil.Discard) hclog.DefaultOptions.Level = hclog.Debug } @@ -704,7 +701,7 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Insert the service _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: "foo", Annotations: map[string]string{annotationServicePort: "rpc"}, }, @@ -781,9 +778,10 @@ func TestServiceResource_clusterIP(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + ClusterIPSync: true, }) defer closer() @@ -859,9 +857,10 @@ func TestServiceResource_clusterIPMultiEndpoint(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + ClusterIPSync: true, }) defer closer() @@ -940,9 +939,10 @@ func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + ClusterIPSync: true, }) defer closer() @@ -1013,6 +1013,79 @@ func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the ClusterIP services aren't synced when ClusterIPSync +// is disabled. +func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + ClusterIPSync: false, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 0) +} + // testService returns a service that will result in a registration. func testService(name string) *apiv1.Service { return &apiv1.Service{ diff --git a/subcommand/sync-catalog/command.go b/subcommand/sync-catalog/command.go index 5b83bfb244cb..f549ca0a5239 100644 --- a/subcommand/sync-catalog/command.go +++ b/subcommand/sync-catalog/command.go @@ -27,17 +27,18 @@ import ( type Command struct { UI cli.Ui - flags *flag.FlagSet - http *flags.HTTPFlags - k8s *k8sflags.K8SFlags - flagToConsul bool - flagToK8S bool - flagConsulDomain string - flagK8SDefault bool - flagK8SServicePrefix string - flagK8SSourceNamespace string - flagK8SWriteNamespace string - flagConsulWritePeriod flags.DurationValue + flags *flag.FlagSet + http *flags.HTTPFlags + k8s *k8sflags.K8SFlags + flagToConsul bool + flagToK8S bool + flagConsulDomain string + flagK8SDefault bool + flagK8SServicePrefix string + flagK8SSourceNamespace string + flagK8SWriteNamespace string + flagConsulWritePeriod flags.DurationValue + flagSyncClusterIPServices bool once sync.Once help string @@ -69,6 +70,9 @@ func (c *Command) init() { "The interval to perform syncing operations creating Consul services. "+ "All changes are merged and write calls are only made on this "+ "interval. Defaults to 30 seconds.") + c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true, + "If true, all valid ClusterIP services in K8S are synced by default. If false, "+ + "ClusterIP services are not synced to Consul.") c.http = &flags.HTTPFlags{} c.k8s = &k8sflags.K8SFlags{} @@ -137,6 +141,7 @@ func (c *Command) Run(args []string) int { Syncer: syncer, Namespace: c.flagK8SSourceNamespace, ExplicitEnable: !c.flagK8SDefault, + ClusterIPSync: c.flagSyncClusterIPServices, }, } From bc37d904f91f6e2d6761f81a3ec798b81db3fde0 Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Fri, 7 Dec 2018 14:58:21 -0800 Subject: [PATCH 3/4] Switched back to previous port calculations to minimize changes There are some complications to how we determine the ports that will need to be revisited. This matches the current behavior until those oddities can be sorted out. --- catalog/from-k8s/resource.go | 14 +++----- catalog/from-k8s/resource_test.go | 55 +++++++++---------------------- 2 files changed, 20 insertions(+), 49 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index a23d4c9b977e..d1fb488c91b6 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -254,10 +254,10 @@ func (t *ServiceResource) generateRegistrations(key string) { baseService.Service = strings.TrimSpace(v) } - var main string // Determine the default port if len(svc.Spec.Ports) > 0 { - main = svc.Spec.Ports[0].Name + nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort + main := svc.Spec.Ports[0].Name // If a specific port is specified, then use that port value if target, ok := svc.Annotations[annotationServicePort]; ok { @@ -271,6 +271,9 @@ func (t *ServiceResource) generateRegistrations(key string) { // also use this opportunity to find our default port. for _, p := range svc.Spec.Ports { target := p.Port + if nodePort && p.NodePort > 0 { + target = p.NodePort + } // Set the tag baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(target), 10) @@ -380,13 +383,6 @@ func (t *ServiceResource) generateRegistrations(key string) { } seen[addr] = struct{}{} - for _, port := range subset.Ports { - if port.Name == main { - baseService.Port = int(port.Port) - break - } - } - r := baseNode rs := baseService r.Service = &rs diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index ffe29f4ed07d..7fcef471b3a2 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -590,11 +590,11 @@ func TestServiceResource_nodePort(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - require.Equal(8080, actual[0].Service.Port) + require.Equal(30000, actual[0].Service.Port) require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(8080, actual[1].Service.Port) + require.Equal(30000, actual[1].Service.Port) require.Equal(node2, actual[1].Node) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -675,11 +675,11 @@ func TestServiceResource_nodePortInitial(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - require.Equal(8080, actual[0].Service.Port) + require.Equal(30000, actual[0].Service.Port) require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(8080, actual[1].Service.Port) + require.Equal(30000, actual[1].Service.Port) require.Equal(node2, actual[1].Node) } @@ -760,11 +760,14 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - require.Equal(2000, actual[0].Service.Port) + + // This is an odd case-- currently if there are multiple NodePorts configured + // for a service, we'll take the last one. + require.Equal(30001, actual[0].Service.Port) require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(2000, actual[1].Service.Port) + require.Equal(30001, actual[1].Service.Port) require.Equal(node2, actual[1].Node) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -814,18 +817,12 @@ func TestServiceResource_clusterIP(t *testing.T) { Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "1.2.3.4"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Port: 8080}, - }, }, apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "2.3.4.5"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Port: 8080}, - }, }, }, }) @@ -841,10 +838,10 @@ func TestServiceResource_clusterIP(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - require.Equal(8080, actual[0].Service.Port) + require.Equal(80, actual[0].Service.Port) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(8080, actual[1].Service.Port) + require.Equal(80, actual[1].Service.Port) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -894,20 +891,12 @@ func TestServiceResource_clusterIPMultiEndpoint(t *testing.T) { Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "1.2.3.4"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, }, apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "2.3.4.5"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, }, }, }) @@ -923,10 +912,10 @@ func TestServiceResource_clusterIPMultiEndpoint(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - require.Equal(8080, actual[0].Service.Port) + require.Equal(80, actual[0].Service.Port) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(8080, actual[1].Service.Port) + require.Equal(80, actual[1].Service.Port) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -977,20 +966,12 @@ func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "1.2.3.4"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, }, apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "2.3.4.5"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, }, }, }) @@ -1006,10 +987,10 @@ func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - require.Equal(2000, actual[0].Service.Port) + require.Equal(8500, actual[0].Service.Port) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(2000, actual[1].Service.Port) + require.Equal(8500, actual[1].Service.Port) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -1059,18 +1040,12 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "1.2.3.4"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Port: 8080}, - }, }, apiv1.EndpointSubset{ Addresses: []apiv1.EndpointAddress{ apiv1.EndpointAddress{IP: "2.3.4.5"}, }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Port: 8080}, - }, }, }, }) From 65d717fe5afa84d11b95ff24e55e448035b90bdf Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Fri, 7 Dec 2018 15:01:04 -0800 Subject: [PATCH 4/4] Streamline if statement based on PR feedback --- catalog/from-k8s/resource.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index d1fb488c91b6..508c7166bcee 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -162,10 +162,8 @@ func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool { } // Ignore ClusterIP services if ClusterIP sync is disabled - if svc.Spec.Type == apiv1.ServiceTypeClusterIP { - if !t.ClusterIPSync { - return false - } + if svc.Spec.Type == apiv1.ServiceTypeClusterIP && !t.ClusterIPSync { + return false } raw, ok := svc.Annotations[annotationServiceSync]