diff --git a/pkg/annotations/service.go b/pkg/annotations/service.go index 1e4a013671..1977207391 100644 --- a/pkg/annotations/service.go +++ b/pkg/annotations/service.go @@ -41,6 +41,16 @@ const ( // 3. Adding this annotation on ingress. NetworkEndpointGroupAlphaAnnotation = "alpha.cloud.google.com/load-balancer-neg" + // ExposeNEGAnnotationKey is the annotation key to specify standalone NEGs associated + // with the service. This should be a valid JSON string, as defined in + // ExposeNegAnnotation. + // example: {"80":{},"443":{}} + ExposeNEGAnnotationKey = "cloud.google.com/neg" + + // NEGStatusKey is the annotation key whose value is the status of the NEGs + // on the Service, and is applied by the NEG Controller. + NEGStatusKey = "cloud.google.com/neg-status" + // BackendConfigKey is a stringified JSON with two fields: // - "ports": a map of port names or port numbers to backendConfig names // - "default": denotes the default backendConfig name for all ports except @@ -58,6 +68,19 @@ const ( ProtocolHTTP2 AppProtocol = "HTTP2" ) +// ExposeNegAnnotation is the format of the annotation associated with the +// ExposeNEGAnnotationKey key, and maps ServicePort to attributes of the NEG that should be +// associated with the ServicePort. ServicePorts in this map will be NEG-enabled. +type ExposeNegAnnotation map[int32]NegAttributes + +// NegAttributes houses the attributes of the NEGs that are associated with the +// service. Future extensions to the Expose NEGs annotation should be added here. +type NegAttributes struct { + // Note - in the future, this will be used for custom naming of NEGs. + // Currently has no effect. + Name string `json:"name,omitempty"` +} + // AppProtocol describes the service protocol. type AppProtocol string @@ -73,7 +96,7 @@ func FromService(obj *v1.Service) *Service { // ApplicationProtocols returns a map of port (name or number) to the protocol // on the port. -func (svc Service) ApplicationProtocols() (map[string]AppProtocol, error) { +func (svc *Service) ApplicationProtocols() (map[string]AppProtocol, error) { val, ok := svc.v[ServiceApplicationProtocolKey] if !ok { return map[string]AppProtocol{}, nil @@ -98,8 +121,9 @@ func (svc Service) ApplicationProtocols() (map[string]AppProtocol, error) { return portToProtos, err } -// NEGEnabled is true if the service uses NEGs. -func (svc Service) NEGEnabled() bool { +// NEGEnabledForIngress returns true if the annotation is to be applied on +// Ingress-referenced ports +func (svc *Service) NEGEnabledForIngress() bool { v, ok := svc.v[NetworkEndpointGroupAlphaAnnotation] return ok && v == "true" } @@ -110,13 +134,49 @@ var ( ErrBackendConfigAnnotationMissing = errors.New("annotation is missing") ) +// NEGExposed is true if the service exposes NEGs +func (svc *Service) NEGExposed() bool { + if !flags.F.Features.NEGExposed { + return false + } + + v, ok := svc.v[ExposeNEGAnnotationKey] + return ok && len(v) > 0 +} + +var ( + ErrExposeNegAnnotationMissing = errors.New("No NEG ServicePorts specified") + ErrExposeNegAnnotationInvalid = errors.New("Expose NEG annotation is invalid") +) + +// ExposeNegAnnotation returns the value of the Expose NEG annotation key +func (svc *Service) ExposeNegAnnotation() (ExposeNegAnnotation, error) { + annotation, ok := svc.v[ExposeNEGAnnotationKey] + if !ok { + return nil, ErrExposeNegAnnotationMissing + } + + // TODO: add link to Expose NEG documentation when complete + var exposedNegPortMap ExposeNegAnnotation + if err := json.Unmarshal([]byte(annotation), &exposedNegPortMap); err != nil { + return nil, ErrExposeNegAnnotationInvalid + } + + return exposedNegPortMap, nil +} + +// NEGEnabled is true if the service uses NEGs. +func (svc *Service) NEGEnabled() bool { + return svc.NEGExposed() || svc.NEGEnabledForIngress() +} + type BackendConfigs struct { Default string `json:"default,omitempty"` Ports map[string]string `json:"ports,omitempty"` } // GetBackendConfigs returns BackendConfigs for the service. -func (svc Service) GetBackendConfigs() (*BackendConfigs, error) { +func (svc *Service) GetBackendConfigs() (*BackendConfigs, error) { val, ok := svc.v[BackendConfigKey] if !ok { return nil, ErrBackendConfigAnnotationMissing diff --git a/pkg/annotations/service_test.go b/pkg/annotations/service_test.go index e37ea7a830..22a6f60630 100644 --- a/pkg/annotations/service_test.go +++ b/pkg/annotations/service_test.go @@ -25,12 +25,77 @@ import ( "k8s.io/ingress-gce/pkg/flags" ) +func TestNEGService(t *testing.T) { + for _, tc := range []struct { + svc *v1.Service + neg bool + ingress bool + exposed bool + }{ + { + svc: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + NetworkEndpointGroupAlphaAnnotation: "true", + }, + }, + }, + neg: true, + ingress: true, + exposed: false, + }, + { + svc: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + ExposeNEGAnnotationKey: `"{"80":{}}"`, + }, + }, + }, + neg: true, + ingress: false, + exposed: true, + }, + { + svc: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + NetworkEndpointGroupAlphaAnnotation: "true", + ExposeNEGAnnotationKey: `"{"80":{}}"`, + }, + }, + }, + neg: true, + ingress: true, + exposed: true, + }, + { + svc: &v1.Service{}, + neg: false, + ingress: false, + exposed: false, + }, + } { + svc := FromService(tc.svc) + if neg := svc.NEGEnabled(); neg != tc.neg { + t.Errorf("for service %+v; svc.NEGEnabled() = %v; want %v", tc.svc, neg, tc.neg) + } + + if ing := svc.NEGEnabledForIngress(); ing != tc.ingress { + t.Errorf("for service %+v; svc.NEGEnabledForIngress() = %v; want %v", tc.svc, ing, tc.ingress) + } + + if exposed := svc.NEGExposed(); exposed != tc.exposed { + t.Errorf("for service %+v; svc.NEGExposed() = %v; want %v", tc.svc, exposed, tc.exposed) + } + } +} + func TestService(t *testing.T) { for _, tc := range []struct { svc *v1.Service appProtocolsErr bool appProtocols map[string]AppProtocol - neg bool http2 bool }{ { @@ -69,17 +134,6 @@ func TestService(t *testing.T) { appProtocols: map[string]AppProtocol{"443": "HTTP2"}, http2: true, }, - { - svc: &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - NetworkEndpointGroupAlphaAnnotation: "true", - }, - }, - }, - appProtocols: map[string]AppProtocol{}, - neg: true, - }, { svc: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -113,9 +167,6 @@ func TestService(t *testing.T) { if err != nil || !reflect.DeepEqual(ap, tc.appProtocols) { t.Errorf("for service %+v; svc.ApplicationProtocols() = %v, %v; want %v, nil", tc.svc, ap, err, tc.appProtocols) } - if b := svc.NEGEnabled(); b != tc.neg { - t.Errorf("for service %+v; svc.NEGEnabled() = %v; want %v", tc.svc, b, tc.neg) - } } } @@ -210,3 +261,63 @@ func TestBackendConfigs(t *testing.T) { } } } + +func TestExposeNegAnnotation(t *testing.T) { + testcases := []struct { + desc string + annotation string + expected ExposeNegAnnotation + expectedErr error + }{ + { + desc: "no expose NEG annotation", + annotation: "", + expectedErr: ErrExposeNegAnnotationMissing, + }, + { + desc: "invalid expose NEG annotation", + annotation: "invalid", + expectedErr: ErrExposeNegAnnotationInvalid, + }, + { + desc: "NEG annotation references existing service ports", + expected: ExposeNegAnnotation{80: NegAttributes{}, 443: NegAttributes{}}, + annotation: `{"80":{},"443":{}}`, + }, + + { + desc: "NEGServicePort takes the union of known ports and ports referenced in the annotation", + annotation: `{"80":{}}`, + expected: ExposeNegAnnotation{80: NegAttributes{}}, + }, + } + + for _, tc := range testcases { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + } + + t.Run(tc.desc, func(t *testing.T) { + if len(tc.annotation) > 0 { + service.Annotations[ExposeNEGAnnotationKey] = tc.annotation + } + + svc := FromService(service) + exposeNegStruct, err := svc.ExposeNegAnnotation() + + if tc.expectedErr == nil && err != nil { + t.Errorf("ExpectedNEGServicePorts to not return an error, got: %v", err) + } + + if !reflect.DeepEqual(exposeNegStruct, tc.expected) { + t.Errorf("Expected NEGServicePorts to equal: %v; got: %v", tc.expected, exposeNegStruct) + } + + if tc.expectedErr != nil && err != tc.expectedErr { + t.Errorf("Expected NEGServicePorts to return a %v error, got: %v", tc.expectedErr, err) + } + }) + } +} diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index 083db01856..be4c2e2956 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -530,7 +530,7 @@ func (b *Backends) Link(sp utils.ServicePort, zones []string) error { if !sp.NEGEnabled { return nil } - negName := b.namer.NEG(sp.ID.Service.Namespace, sp.ID.Service.Name, sp.SvcTargetPort) + negName := sp.BackendName(b.namer) var negs []*computealpha.NetworkEndpointGroup var err error for _, zone := range zones { diff --git a/pkg/backends/backends_test.go b/pkg/backends/backends_test.go index 536d8102c9..5120667fac 100644 --- a/pkg/backends/backends_test.go +++ b/pkg/backends/backends_test.go @@ -552,39 +552,39 @@ func TestBackendPoolSyncQuota(t *testing.T) { true, "New set of ports not including the same port", }, - // Need to fill the SvcTargetPort field on ServicePort to make sure + // Need to fill the TargetPort field on ServicePort to make sure // NEG Backend naming is unique { []utils.ServicePort{{NodePort: 8080}, {NodePort: 443}}, []utils.ServicePort{ - {NodePort: 8080, SvcTargetPort: "testport8080", NEGEnabled: true}, - {NodePort: 443, SvcTargetPort: "testport443", NEGEnabled: true}, + {Port: 8080, NodePort: 8080, NEGEnabled: true}, + {Port: 443, NodePort: 443, NEGEnabled: true}, }, true, "Same port converted to NEG, plus one new NEG port", }, { []utils.ServicePort{ - {NodePort: 80, SvcTargetPort: "testport80", NEGEnabled: true}, - {NodePort: 90, SvcTargetPort: "testport90"}, + {Port: 80, NodePort: 80, NEGEnabled: true}, + {Port: 90, NodePort: 90}, }, []utils.ServicePort{ - {NodePort: 80, SvcTargetPort: "testport80"}, - {NodePort: 90, SvcTargetPort: "testport90", NEGEnabled: true}, + {Port: 80}, + {Port: 90, NEGEnabled: true}, }, true, "Mixed NEG and non-NEG ports", }, { []utils.ServicePort{ - {NodePort: 100, SvcTargetPort: "testport100", NEGEnabled: true}, - {NodePort: 110, SvcTargetPort: "testport110", NEGEnabled: true}, - {NodePort: 120, SvcTargetPort: "testport120", NEGEnabled: true}, + {Port: 100, NodePort: 100, NEGEnabled: true}, + {Port: 110, NodePort: 110, NEGEnabled: true}, + {Port: 120, NodePort: 120, NEGEnabled: true}, }, []utils.ServicePort{ - {NodePort: 100, SvcTargetPort: "testport100"}, - {NodePort: 110, SvcTargetPort: "testport110"}, - {NodePort: 120, SvcTargetPort: "testport120"}, + {Port: 100, NodePort: 100}, + {Port: 110, NodePort: 110}, + {Port: 120, NodePort: 120}, }, true, "Same ports as NEG, then non-NEG", @@ -854,12 +854,12 @@ func TestLinkBackendServiceToNEG(t *testing.T) { Namespace: namespace, Name: name, }, - Port: intstr.FromInt(80), }, - NodePort: 30001, - Protocol: annotations.ProtocolHTTP, - SvcTargetPort: port, - NEGEnabled: true, + Port: 80, + NodePort: 30001, + Protocol: annotations.ProtocolHTTP, + TargetPort: port, + NEGEnabled: true, } if err := bp.Ensure([]utils.ServicePort{svcPort}, nil); err != nil { t.Fatalf("Failed to ensure backend service: %v", err) @@ -867,7 +867,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) { for _, zone := range zones { err := fakeNEG.CreateNetworkEndpointGroup(&computealpha.NetworkEndpointGroup{ - Name: defaultNamer.NEG(namespace, name, port), + Name: defaultNamer.NEG(namespace, name, svcPort.Port), }, zone) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index f6ab2b068f..90380e51fb 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -87,10 +87,11 @@ func (t *Translator) getServicePort(id utils.ServicePortID) (*utils.ServicePort, return nil, errors.ErrBadSvcType{Service: id.Service, ServiceType: svc.Spec.Type} } svcPort = &utils.ServicePort{ - ID: id, - NodePort: int64(port.NodePort), - SvcTargetPort: port.TargetPort.String(), - NEGEnabled: t.ctx.NEGEnabled && negEnabled, + ID: id, + NodePort: int64(port.NodePort), + Port: int32(port.Port), + TargetPort: port.TargetPort.String(), + NEGEnabled: t.ctx.NEGEnabled && negEnabled, } appProtocols, err := annotations.FromService(svc).ApplicationProtocols() @@ -279,7 +280,7 @@ func (t *Translator) GatherEndpointPorts(svcPorts []utils.ServicePort) []string // For NEG backend, need to open firewall to all endpoint target ports // TODO(mixia): refactor firewall syncing into a separate go routine with different trigger. // With NEG, endpoint changes may cause firewall ports to be different if user specifies inconsistent backends. - endpointPorts := listEndpointTargetPorts(t.ctx.EndpointInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.SvcTargetPort) + endpointPorts := listEndpointTargetPorts(t.ctx.EndpointInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.TargetPort) for _, ep := range endpointPorts { portMap[int64(ep)] = true } diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index b997575637..930354feb1 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -493,16 +493,16 @@ func TestGatherEndpointPorts(t *testing.T) { {NodePort: int64(30001)}, {NodePort: int64(30002)}, { - ID: utils.ServicePortID{Service: types.NamespacedName{Namespace: "ns", Name: ep1}}, - NodePort: int64(30003), - NEGEnabled: true, - SvcTargetPort: "80", + ID: utils.ServicePortID{Service: types.NamespacedName{Namespace: "ns", Name: ep1}}, + NodePort: int64(30003), + NEGEnabled: true, + TargetPort: "80", }, { - ID: utils.ServicePortID{Service: types.NamespacedName{Namespace: "ns", Name: ep2}}, - NodePort: int64(30004), - NEGEnabled: true, - SvcTargetPort: "named-port", + ID: utils.ServicePortID{Service: types.NamespacedName{Namespace: "ns", Name: ep2}}, + NodePort: int64(30004), + NEGEnabled: true, + TargetPort: "named-port", }, } diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 80723ceec0..28a8a42de8 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -124,11 +124,14 @@ type Features struct { Http2 bool // NEG enables using NetworkEndpointGroups instead of IGs as backends NEG bool + // NEGExposed enables using standalone (exposed) NEGs + NEGExposed bool } var DefaultFeatures = &Features{ - Http2: true, - NEG: true, + Http2: true, + NEG: true, + NEGExposed: true, } func EnabledFeatures() *Features { diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 10c317f6e3..1d1c9990b4 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -17,6 +17,7 @@ limitations under the License. package neg import ( + "encoding/json" "fmt" "reflect" "strconv" @@ -62,6 +63,8 @@ type Controller struct { // serviceQueue takes service key as work item. Service key with format "namespace/name". serviceQueue workqueue.RateLimitingInterface + zoneGetter zoneGetter + namer networkEndpointGroupNamer } // NewController returns a network endpoint group controller. @@ -99,6 +102,8 @@ func NewController( ingressLister: ctx.IngressInformer.GetIndexer(), serviceLister: ctx.ServiceInformer.GetIndexer(), serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + zoneGetter: zoneGetter, + namer: namer, } ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -210,13 +215,83 @@ func (c *Controller) processService(key string) error { if !enabled { c.manager.StopSyncer(namespace, name) - return nil + // delete the annotation + return c.syncNegAnnotation(namespace, name, service, make(PortNameMap)) } glog.V(2).Infof("Syncing service %q", key) - // Only service ports referenced by ingress are synced for NEG - ings := getIngressServicesFromStore(c.ingressLister, service) - return c.manager.EnsureSyncers(namespace, name, gatherSerivceTargetPortUsedByIngress(ings, service)) + // map of ServicePort (int) to TargetPort + svcPortMap := make(PortNameMap) + + if annotations.FromService(service).NEGEnabledForIngress() { + // Only service ports referenced by ingress are synced for NEG + ings := getIngressServicesFromStore(c.ingressLister, service) + svcPortMap = gatherPortMappingUsedByIngress(ings, service) + } + + if annotations.FromService(service).NEGExposed() { + knownPorts := make(PortNameMap) + for _, sp := range service.Spec.Ports { + knownPorts[sp.Port] = sp.TargetPort.String() + } + + annotation, err := annotations.FromService(service).ExposeNegAnnotation() + if err != nil { + return err + } + + negSvcPorts, err := NEGServicePorts(annotation, knownPorts) + if err != nil { + return err + } + + svcPortMap = svcPortMap.Union(negSvcPorts) + } + + err = c.syncNegAnnotation(namespace, name, service, svcPortMap) + if err != nil { + return err + } + return c.manager.EnsureSyncers(namespace, name, svcPortMap) +} + +func (c *Controller) syncNegAnnotation(namespace, name string, service *apiv1.Service, portMap PortNameMap) error { + zones, err := c.zoneGetter.ListZones() + if err != nil { + return err + } + + if len(portMap) == 0 { + if _, ok := service.Annotations[annotations.NEGStatusKey]; ok { + // TODO: use PATCH to remove annotation + delete(service.Annotations, annotations.NEGStatusKey) + glog.V(2).Infof("Removing expose NEG annotation from service: %s/%s", namespace, name) + return c.serviceLister.Update(service) + } + // service doesn't have the expose NEG annotation and doesn't need update + return nil + } + + portToNegs := make(PortNameMap) + for svcPort, _ := range portMap { + portToNegs[svcPort] = c.namer.NEG(namespace, name, svcPort) + } + negSvcState := GetNegStatus(zones, portToNegs) + formattedAnnotation, err := json.Marshal(negSvcState) + if err != nil { + return err + } + + annotation := string(formattedAnnotation) + + existingAnnotation, ok := service.Annotations[annotations.NEGStatusKey] + if ok && existingAnnotation == annotation { + return nil + } + + service.Annotations[annotations.NEGStatusKey] = annotation + glog.V(2).Infof("Updating NEG visibility annotation %q on service %s/%s.", annotation, namespace, name) + return c.serviceLister.Update(service) } func (c *Controller) handleErr(err error, key interface{}) { @@ -270,10 +345,11 @@ func (c *Controller) synced() bool { c.ingressSynced() } -// gatherSerivceTargetPortUsedByIngress returns all target ports of the service that are referenced by ingresses -func gatherSerivceTargetPortUsedByIngress(ings []extensions.Ingress, svc *apiv1.Service) sets.String { +// gatherPortMappingUsedByIngress returns a map containing port:targetport +// of all service ports of the service that are referenced by ingresses +func gatherPortMappingUsedByIngress(ings []extensions.Ingress, svc *apiv1.Service) PortNameMap { servicePorts := sets.NewString() - targetPorts := sets.NewString() + ingressSvcPorts := make(PortNameMap) for _, ing := range ings { if ing.Spec.Backend != nil && ing.Spec.Backend.ServiceName == svc.Name { servicePorts.Insert(ing.Spec.Backend.ServicePort.String()) @@ -308,11 +384,11 @@ func gatherSerivceTargetPortUsedByIngress(ings []extensions.Ingress, svc *apiv1. } } if found { - targetPorts.Insert(svcPort.TargetPort.String()) + ingressSvcPorts[svcPort.Port] = svcPort.TargetPort.String() } } - return targetPorts + return ingressSvcPorts } // gatherIngressServiceKeys returns all service key (formatted as namespace/name) referenced in the ingress diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 27150abea3..5d3fe990da 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -17,12 +17,16 @@ limitations under the License. package neg import ( + "encoding/json" + "fmt" + "strings" "testing" "time" apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/ingress-gce/pkg/annotations" @@ -47,9 +51,11 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { } func TestNewNonNEGService(t *testing.T) { + t.Parallel() + controller := newTestController(fake.NewSimpleClientset()) defer controller.stop() - controller.serviceLister.Add(newTestService(false)) + controller.serviceLister.Add(newTestService(false, []int32{})) controller.ingressLister.Add(newTestIngress()) err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) if err != nil { @@ -60,41 +66,124 @@ func TestNewNonNEGService(t *testing.T) { } func TestNewNEGService(t *testing.T) { - controller := newTestController(fake.NewSimpleClientset()) - defer controller.stop() - controller.serviceLister.Add(newTestService(true)) - controller.ingressLister.Add(newTestIngress()) - err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) - if err != nil { - t.Fatalf("Failed to process service: %v", err) + t.Parallel() + + testCases := []struct { + svcPorts []int32 + ingress bool + desc string + }{ + { + []int32{80, 443, 8081, 8080}, + true, + "With ingress, 3 ports same as in ingress, 1 new port", + }, + { + []int32{80, 443, 8081, 8080, 1234, 5678}, + true, + "With ingress, 3 ports same as ingress and 3 new ports", + }, + { + []int32{80, 1234, 5678}, + true, + "With ingress, 1 port same as ingress and 2 new ports", + }, + { + []int32{}, + false, + "With ingress, no additional ports", + }, + { + []int32{80, 443, 8081, 8080}, + false, + "No ingress, 4 ports", + }, + { + []int32{80}, + false, + "No ingress, 1 port", + }, + { + []int32{}, + false, + "No ingress, no ports", + }, } - validateSyncers(t, controller, 3, false) + testIngressPorts := sets.NewString([]string{"80", "443", "8081"}...) + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + controller := newTestController(fake.NewSimpleClientset()) + defer controller.stop() + svcKey := serviceKeyFunc(testServiceNamespace, testServiceName) + controller.serviceLister.Add(newTestService(tc.ingress, tc.svcPorts)) + + if tc.ingress { + controller.ingressLister.Add(newTestIngress()) + } + + err := controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + + expectedSyncers := len(tc.svcPorts) + if tc.ingress { + svcPorts := sets.NewString() + for _, port := range tc.svcPorts { + svcPorts.Insert(fmt.Sprintf("%v", port)) + } + expectedSyncers = len(svcPorts.Union(testIngressPorts)) + } + validateSyncers(t, controller, expectedSyncers, false) + svc, exists, _ := controller.serviceLister.GetByKey(svcKey) + if !exists || err != nil { + t.Fatalf("Service was not created successfully, err: %v", err) + } + validateServiceStateAnnotation(t, svc.(*apiv1.Service), tc.svcPorts) + }) + } } -func TestEnableNEGService(t *testing.T) { +func TestEnableNEGServiceWithIngress(t *testing.T) { + t.Parallel() + controller := newTestController(fake.NewSimpleClientset()) defer controller.stop() - controller.serviceLister.Add(newTestService(false)) + controller.serviceLister.Add(newTestService(false, []int32{})) controller.ingressLister.Add(newTestIngress()) - err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) + svcKey := serviceKeyFunc(testServiceNamespace, testServiceName) + err := controller.processService(svcKey) if err != nil { t.Fatalf("Failed to process service: %v", err) } validateSyncers(t, controller, 0, true) + svc, exists, _ := controller.serviceLister.GetByKey(svcKey) + if !exists || err != nil { + t.Fatalf("Service was not created successfully, err: %v", err) + } - controller.serviceLister.Update(newTestService(true)) - err = controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) + controller.serviceLister.Update(newTestService(true, []int32{})) + err = controller.processService(svcKey) if err != nil { t.Fatalf("Failed to process service: %v", err) } validateSyncers(t, controller, 3, false) + svc, exists, _ = controller.serviceLister.GetByKey(svcKey) + svcPorts := []int32{80, 8081, 443} + if !exists || err != nil { + t.Fatalf("Service was not created successfully, err: %v", err) + } + validateServiceStateAnnotation(t, svc.(*apiv1.Service), svcPorts) } -func TestDisableNEGService(t *testing.T) { +func TestDisableNEGServiceWithIngress(t *testing.T) { + t.Parallel() + controller := newTestController(fake.NewSimpleClientset()) defer controller.stop() - controller.serviceLister.Add(newTestService(true)) + controller.serviceLister.Add(newTestService(true, []int32{})) controller.ingressLister.Add(newTestIngress()) err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) if err != nil { @@ -102,7 +191,7 @@ func TestDisableNEGService(t *testing.T) { } validateSyncers(t, controller, 3, false) - controller.serviceLister.Update(newTestService(false)) + controller.serviceLister.Update(newTestService(false, []int32{})) err = controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) if err != nil { t.Fatalf("Failed to process service: %v", err) @@ -110,12 +199,14 @@ func TestDisableNEGService(t *testing.T) { validateSyncers(t, controller, 3, true) } -func TestGatherServiceTargetPortUsedByIngress(t *testing.T) { +func TestGatherPortMappingUsedByIngress(t *testing.T) { + t.Parallel() + testCases := []struct { ings []extensions.Ingress - expect []string + expect []int32 + desc string }{ - // no match { []extensions.Ingress{{ ObjectMeta: metav1.ObjectMeta{ @@ -129,9 +220,9 @@ func TestGatherServiceTargetPortUsedByIngress(t *testing.T) { }, }, }}, - []string{}, + []int32{}, + "no match", }, - // ingress spec point to non-existed service port { []extensions.Ingress{{ ObjectMeta: metav1.ObjectMeta{ @@ -145,9 +236,9 @@ func TestGatherServiceTargetPortUsedByIngress(t *testing.T) { }, }, }}, - []string{}, + []int32{}, + "ingress spec point to non-existed service port", }, - // ingress point to multiple services { []extensions.Ingress{{ ObjectMeta: metav1.ObjectMeta{ @@ -181,35 +272,102 @@ func TestGatherServiceTargetPortUsedByIngress(t *testing.T) { }, }, }}, - []string{"8080"}, + []int32{80}, + "ingress point to multiple services", }, - // two ingresses with multiple different references to service { []extensions.Ingress{*newTestIngress(), *newTestIngress()}, - []string{"8080", "8081", testNamedPort}, + []int32{80, 443, 8081}, + "two ingresses with multiple different references to service", }, - // one ingress with multiple different references to service { []extensions.Ingress{*newTestIngress()}, - []string{"8080", "8081", testNamedPort}, + []int32{80, 443, 8081}, + "one ingress with multiple different references to service", }, } for _, tc := range testCases { - ports := gatherSerivceTargetPortUsedByIngress(tc.ings, newTestService(true)) - if len(ports) != len(tc.expect) { - t.Errorf("Expect %v ports, but got %v.", len(tc.expect), len(ports)) + portMap := gatherPortMappingUsedByIngress(tc.ings, newTestService(true, []int32{})) + if len(portMap) != len(tc.expect) { + t.Errorf("Expect %v ports, but got %v.", len(tc.expect), len(portMap)) } for _, exp := range tc.expect { - if !ports.Has(exp) { - t.Errorf("Expect ports to include %q.", exp) + if _, ok := portMap[exp]; !ok { + t.Errorf("Expect ports to include %v.", exp) } } } } +func TestSyncNegAnnotation(t *testing.T) { + t.Parallel() + // TODO: test that c.serviceLister.Update is called whenever the annotation + // is changed. When there is no change, Update should not be called. + controller := newTestController(fake.NewSimpleClientset()) + defer controller.stop() + controller.serviceLister.Add(newTestService(false, []int32{})) + svcKey := serviceKeyFunc(testServiceNamespace, testServiceName) + + testCases := []struct { + desc string + previousPortMap PortNameMap + portMap PortNameMap + }{ + { + desc: "apply new annotation with no previous annotation", + portMap: PortNameMap{80: "named_port", 443: "other_port"}, + }, + { + desc: "same annotation applied twice", + previousPortMap: PortNameMap{80: "named_port", 4040: "other_port"}, + portMap: PortNameMap{80: "named_port", 4040: "other_port"}, + }, + { + desc: "apply new annotation and override previous annotation", + previousPortMap: PortNameMap{80: "named_port", 4040: "other_port"}, + portMap: PortNameMap{3000: "6000", 4000: "8000"}, + }, + { + desc: "remove previous annotation", + previousPortMap: PortNameMap{80: "named_port", 4040: "other_port"}, + }, + { + desc: "remove annotation with no previous annotation", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + svc, exists, err := controller.serviceLister.GetByKey(svcKey) + if !exists || err != nil { + t.Fatalf("Service was not retrieved successfully, err: %v", err) + } + + controller.syncNegAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.previousPortMap) + svc, _, _ = controller.serviceLister.GetByKey(svcKey) + + var oldSvcPorts []int32 + for port, _ := range tc.previousPortMap { + oldSvcPorts = append(oldSvcPorts, port) + } + validateServiceStateAnnotation(t, svc.(*apiv1.Service), oldSvcPorts) + + controller.syncNegAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.portMap) + svc, _, _ = controller.serviceLister.GetByKey(svcKey) + + var svcPorts []int32 + for port, _ := range tc.portMap { + svcPorts = append(svcPorts, port) + } + validateServiceStateAnnotation(t, svc.(*apiv1.Service), svcPorts) + }) + } +} + func validateSyncers(t *testing.T, controller *Controller, num int, stopped bool) { + t.Helper() if len(controller.manager.(*syncerManager).syncerMap) != num { t.Errorf("got %v syncer, want %v.", len(controller.manager.(*syncerManager).syncerMap), num) } @@ -220,6 +378,48 @@ func validateSyncers(t *testing.T, controller *Controller, num int, stopped bool } } +func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts []int32) { + t.Helper() + if len(svcPorts) == 0 { + v, ok := svc.Annotations[annotations.NEGStatusKey] + if ok { + t.Fatalf("Expected no NEG service state annotation when there are no servicePorts, got: %v", v) + } + return + } + + v, ok := svc.Annotations[annotations.NEGStatusKey] + if !ok { + t.Fatalf("Failed to apply the NEG service state annotation, got %+v", svc.Annotations) + } + + for _, port := range svcPorts { + if !strings.Contains(v, fmt.Sprintf("%v", port)) { + t.Fatalf("Expected NEG service state annotation to contain port %v, got %v", port, v) + } + } + + zoneGetter := NewFakeZoneGetter() + zones, _ := zoneGetter.ListZones() + for _, zone := range zones { + if !strings.Contains(v, zone) { + t.Fatalf("Expected NEG service state annotation to contain zone %v, got %v", zone, v) + } + } +} + +func generateNegAnnotation(svcPorts []int32) string { + if len(svcPorts) == 0 { + return "" + } + annotation := make(map[int32]annotations.NegAttributes) + for _, port := range svcPorts { + annotation[port] = annotations.NegAttributes{} + } + formattedAnnotation, _ := json.Marshal(annotation) + return string(formattedAnnotation) +} + func newTestIngress() *extensions.Ingress { return &extensions.Ingress{ ObjectMeta: metav1.ObjectMeta{ @@ -266,11 +466,43 @@ func newTestIngress() *extensions.Ingress { } } -func newTestService(negEnabled bool) *apiv1.Service { +func newTestService(negIngress bool, negSvcPorts []int32) *apiv1.Service { svcAnnotations := map[string]string{} - if negEnabled { - svcAnnotations[annotations.NetworkEndpointGroupAlphaAnnotation] = "true" + svcAnnotations[annotations.NetworkEndpointGroupAlphaAnnotation] = fmt.Sprint(negIngress) + if len(negSvcPorts) > 0 { + svcAnnotations[annotations.ExposeNEGAnnotationKey] = generateNegAnnotation(negSvcPorts) + } + + ports := []apiv1.ServicePort{ + { + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + { + Port: 443, + TargetPort: intstr.FromString(testNamedPort), + }, + { + Name: testNamedPort, + Port: 8081, + TargetPort: intstr.FromInt(8081), + }, + { + Port: 8888, + TargetPort: intstr.FromInt(8888), + }, } + + for _, port := range negSvcPorts { + ports = append( + ports, + apiv1.ServicePort{ + Port: port, + TargetPort: intstr.FromString(fmt.Sprintf("%v", port)), + }, + ) + } + return &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: testServiceName, @@ -278,25 +510,7 @@ func newTestService(negEnabled bool) *apiv1.Service { Annotations: svcAnnotations, }, Spec: apiv1.ServiceSpec{ - Ports: []apiv1.ServicePort{ - { - Port: 80, - TargetPort: intstr.FromInt(8080), - }, - { - Port: 443, - TargetPort: intstr.FromString(testNamedPort), - }, - { - Name: testNamedPort, - Port: 8081, - TargetPort: intstr.FromInt(8081), - }, - { - Port: 8888, - TargetPort: intstr.FromInt(8888), - }, - }, + Ports: ports, }, } } diff --git a/pkg/neg/interfaces.go b/pkg/neg/interfaces.go index 6e43d6729d..d30dc3bbf9 100644 --- a/pkg/neg/interfaces.go +++ b/pkg/neg/interfaces.go @@ -18,7 +18,6 @@ package neg import ( computealpha "google.golang.org/api/compute/v0.alpha" - "k8s.io/apimachinery/pkg/util/sets" ) // networkEndpointGroupCloud is an interface for managing gce network endpoint group. @@ -37,7 +36,7 @@ type networkEndpointGroupCloud interface { // networkEndpointGroupNamer is an interface for generating network endpoint group name. type networkEndpointGroupNamer interface { - NEG(namespace, name, port string) string + NEG(namespace, name string, port int32) string IsNEG(name string) bool } @@ -64,7 +63,8 @@ type negSyncer interface { // negSyncerManager is an interface for controllers to manage syncer type negSyncerManager interface { // EnsureSyncer ensures corresponding syncers are started and stops any unnecessary syncer - EnsureSyncers(namespace, name string, targetPorts sets.String) error + // portMap is a map of ServicePort Port to TargetPort + EnsureSyncers(namespace, name string, portMap PortNameMap) error // StopSyncer stops all syncers related to the service. This call is asynchronous. It will not wait for all syncers to stop. StopSyncer(namespace, name string) // Sync signals all syncers related to the service to sync. This call is asynchronous. diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index d2d7f18e6a..b8a27619d3 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -45,8 +45,9 @@ type syncerManager struct { // TODO: lock per service instead of global lock mu sync.Mutex // svcPortMap is the canonical indicator for whether a service needs NEG. - // key consists of service namespace and name. Value is the list of target ports that requires NEG - svcPortMap map[serviceKey]sets.String + // key consists of service namespace and name. Value is a map of ServicePort + // Port:TargetPort, which represents ports that require NEG + svcPortMap map[serviceKey]PortNameMap // syncerMap stores the NEG syncer // key consists of service namespace, name and targetPort. Value is the corresponding syncer. syncerMap map[servicePort]negSyncer @@ -60,29 +61,29 @@ func newSyncerManager(namer networkEndpointGroupNamer, recorder record.EventReco zoneGetter: zoneGetter, serviceLister: serviceLister, endpointLister: endpointLister, - svcPortMap: make(map[serviceKey]sets.String), + svcPortMap: make(map[serviceKey]PortNameMap), syncerMap: make(map[servicePort]negSyncer), } } // EnsureSyncer starts and stops syncers based on the input service ports. -func (manager *syncerManager) EnsureSyncers(namespace, name string, targetPorts sets.String) error { +func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts PortNameMap) error { manager.mu.Lock() defer manager.mu.Unlock() key := getServiceKey(namespace, name) currentPorts, ok := manager.svcPortMap[key] if !ok { - currentPorts = sets.NewString() + currentPorts = make(PortNameMap) } - removes := currentPorts.Difference(targetPorts).List() - adds := targetPorts.Difference(currentPorts).List() - manager.svcPortMap[key] = targetPorts + removes := currentPorts.Difference(newPorts) + adds := newPorts.Difference(currentPorts) + + manager.svcPortMap[key] = newPorts glog.V(3).Infof("EnsureSyncer %v/%v: removing %v ports, adding %v ports", namespace, name, removes, adds) - // Stop syncer for removed ports - for _, port := range removes { - syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, port)] + for svcPort, targetPort := range removes { + syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)] if ok { syncer.Stop() } @@ -90,23 +91,24 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, targetPorts errList := []error{} // Ensure a syncer is running for each port that is being added. - for _, port := range adds { - syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, port)] + for svcPort, targetPort := range adds { + syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)] if !ok { syncer = newSyncer( servicePort{ namespace: namespace, name: name, - targetPort: port, + port: svcPort, + targetPort: targetPort, }, - manager.namer.NEG(namespace, name, port), + manager.namer.NEG(namespace, name, svcPort), manager.recorder, manager.cloud, manager.zoneGetter, manager.serviceLister, manager.endpointLister, ) - manager.syncerMap[getSyncerKey(namespace, name, port)] = syncer + manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)] = syncer } if syncer.IsStopped() { @@ -115,6 +117,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, targetPorts } } } + return utilerrors.NewAggregate(errList) } @@ -124,8 +127,8 @@ func (manager *syncerManager) StopSyncer(namespace, name string) { defer manager.mu.Unlock() key := getServiceKey(namespace, name) if ports, ok := manager.svcPortMap[key]; ok { - for _, port := range ports.List() { - if syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, port)]; ok { + for svcPort, targetPort := range ports { + if syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)]; ok { syncer.Stop() } } @@ -140,8 +143,8 @@ func (manager *syncerManager) Sync(namespace, name string) { defer manager.mu.Unlock() key := getServiceKey(namespace, name) if portList, ok := manager.svcPortMap[key]; ok { - for _, port := range portList.List() { - if syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, port)]; ok { + for svcPort, targetPort := range portList { + if syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)]; ok { if !syncer.IsStopped() { syncer.Sync() } @@ -215,8 +218,8 @@ func (manager *syncerManager) garbageCollectNEG() error { manager.mu.Lock() defer manager.mu.Unlock() for key, ports := range manager.svcPortMap { - for _, port := range ports.List() { - name := manager.namer.NEG(key.namespace, key.name, port) + for sp, _ := range ports { + name := manager.namer.NEG(key.namespace, key.name, sp) negNames.Delete(name) } } @@ -248,11 +251,12 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string } // getSyncerKey encodes a service namespace, name and targetPort into a string key -func getSyncerKey(namespace, name, port string) servicePort { +func getSyncerKey(namespace, name string, port int32, targetPort string) servicePort { return servicePort{ namespace: namespace, name: name, - targetPort: port, + port: port, + targetPort: targetPort, } } diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index c6bd50cf02..982acde65e 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -22,7 +22,6 @@ import ( compute "google.golang.org/api/compute/v0.alpha" apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -54,48 +53,48 @@ func TestEnsureAndStopSyncer(t *testing.T) { testCases := []struct { namespace string name string - ports sets.String + ports PortNameMap stop bool expect []servicePort // keys of running syncers }{ { "ns1", "n1", - sets.NewString("80", "443"), + PortNameMap{1000: "80", 2000: "443"}, false, []servicePort{ - getSyncerKey("ns1", "n1", "80"), - getSyncerKey("ns1", "n1", "443"), + getSyncerKey("ns1", "n1", 1000, "80"), + getSyncerKey("ns1", "n1", 2000, "443"), }, }, { "ns1", "n1", - sets.NewString("80", "namedport"), + PortNameMap{3000: "80", 4000: "namedport"}, false, []servicePort{ - getSyncerKey("ns1", "n1", "80"), - getSyncerKey("ns1", "n1", "namedport"), + getSyncerKey("ns1", "n1", 3000, "80"), + getSyncerKey("ns1", "n1", 4000, "namedport"), }, }, { "ns2", "n1", - sets.NewString("80"), + PortNameMap{3000: "80"}, false, []servicePort{ - getSyncerKey("ns1", "n1", "80"), - getSyncerKey("ns1", "n1", "namedport"), - getSyncerKey("ns2", "n1", "80"), + getSyncerKey("ns1", "n1", 3000, "80"), + getSyncerKey("ns1", "n1", 4000, "namedport"), + getSyncerKey("ns2", "n1", 3000, "80"), }, }, { "ns1", "n1", - sets.NewString(), + PortNameMap{}, true, []servicePort{ - getSyncerKey("ns2", "n1", "80"), + getSyncerKey("ns2", "n1", 3000, "80"), }, }, } @@ -113,11 +112,11 @@ func TestEnsureAndStopSyncer(t *testing.T) { for _, key := range tc.expect { syncer, ok := manager.syncerMap[key] if !ok { - t.Errorf("Expect syncer key %q to be present.", key) + t.Errorf("Expect syncer key %+v to be present.", key) continue } if syncer.IsStopped() || syncer.IsShuttingDown() { - t.Errorf("Expect syncer %q to be running.", key) + t.Errorf("Expect syncer %+v to be running.", key) } } for key, syncer := range manager.syncerMap { @@ -132,7 +131,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { continue } if !syncer.IsStopped() { - t.Errorf("Expect syncer %q to be stopped.", key) + t.Errorf("Expect syncer %+v to be stopped.", key) } } } @@ -144,13 +143,17 @@ func TestEnsureAndStopSyncer(t *testing.T) { func TestGarbageCollectionSyncer(t *testing.T) { manager := NewTestSyncerManager(fake.NewSimpleClientset()) - if err := manager.EnsureSyncers("ns1", "n1", sets.NewString("80", "namedport")); err != nil { + portMap := make(PortNameMap) + portMap[3000] = "80" + portMap[4000] = "namedport" + + if err := manager.EnsureSyncers("ns1", "n1", portMap); err != nil { t.Fatalf("Failed to ensure syncer: %v", err) } manager.StopSyncer("ns1", "n1") - syncer1 := manager.syncerMap[getSyncerKey("ns1", "n1", "80")] - syncer2 := manager.syncerMap[getSyncerKey("ns1", "n1", "namedport")] + syncer1 := manager.syncerMap[getSyncerKey("ns1", "n1", 3000, "80")] + syncer2 := manager.syncerMap[getSyncerKey("ns1", "n1", 4000, "namedport")] if err := wait.PollImmediate(time.Second, 30*time.Second, func() (bool, error) { return !syncer1.IsShuttingDown() && syncer1.IsStopped() && !syncer2.IsShuttingDown() && syncer2.IsStopped(), nil @@ -173,11 +176,13 @@ func TestGarbageCollectionNEG(t *testing.T) { t.Fatalf("Failed to create endpoint: %v", err) } manager := NewTestSyncerManager(kubeClient) - if err := manager.EnsureSyncers(testServiceNamespace, testServiceName, sets.NewString("80")); err != nil { + ports := make(PortNameMap) + ports[80] = "namedport" + if err := manager.EnsureSyncers(testServiceNamespace, testServiceName, ports); err != nil { t.Fatalf("Failed to ensure syncer: %v", err) } - negName := manager.namer.NEG("test", "test", "80") + negName := manager.namer.NEG("test", "test", 80) manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ Name: negName, }, TestZone1) diff --git a/pkg/neg/syncer.go b/pkg/neg/syncer.go index f3e1385ed5..0e1846f300 100644 --- a/pkg/neg/syncer.go +++ b/pkg/neg/syncer.go @@ -45,9 +45,9 @@ const ( // servicePort includes information to uniquely identify a NEG type servicePort struct { - namespace string - name string - // Serivice target port + namespace string + name string + port int32 targetPort string } @@ -101,13 +101,13 @@ func (s *syncer) init() { // Start starts the syncer go routine if it has not been started. func (s *syncer) Start() error { if !s.IsStopped() { - return fmt.Errorf("NEG syncer for %s/%s-%s is already running.", s.namespace, s.name, s.targetPort) + return fmt.Errorf("NEG syncer for %s/%s-%v/%s is already running.", s.namespace, s.name, s.port, s.targetPort) } if s.IsShuttingDown() { - return fmt.Errorf("NEG syncer for %s/%s-%s is shutting down. ", s.namespace, s.name, s.targetPort) + return fmt.Errorf("NEG syncer for %s/%s-%v/%s is shutting down. ", s.namespace, s.name, s.port, s.targetPort) } - glog.V(2).Infof("Starting NEG syncer for service port %s/%s-%s", s.namespace, s.name, s.targetPort) + glog.V(2).Infof("Starting NEG syncer for service port %s/%s-%v/%s", s.namespace, s.name, s.port, s.targetPort) s.init() go func() { for { @@ -136,7 +136,7 @@ func (s *syncer) Start() error { s.stateLock.Lock() s.shuttingDown = false s.stateLock.Unlock() - glog.V(2).Infof("Stopping NEG syncer for %s/%s-%s", s.namespace, s.name, s.targetPort) + glog.V(2).Infof("Stopping NEG syncer for %s/%s-%v-%s", s.namespace, s.name, s.port, s.targetPort) return } case <-retryCh: @@ -152,7 +152,7 @@ func (s *syncer) Stop() { s.stateLock.Lock() defer s.stateLock.Unlock() if !s.stopped { - glog.V(2).Infof("Stopping NEG syncer for service port %s/%s-%s", s.namespace, s.name, s.targetPort) + glog.V(2).Infof("Stopping NEG syncer for service port %s/%s-%v/%s", s.namespace, s.name, s.port, s.targetPort) s.stopped = true s.shuttingDown = true close(s.syncCh) diff --git a/pkg/neg/syncer_test.go b/pkg/neg/syncer_test.go index 7146e5556d..c82a297e42 100644 --- a/pkg/neg/syncer_test.go +++ b/pkg/neg/syncer_test.go @@ -29,6 +29,7 @@ func NewTestSyncer() *syncer { svcPort := servicePort{ namespace: testServiceNamespace, name: testServiceName, + port: 80, targetPort: "80", } diff --git a/pkg/neg/utils.go b/pkg/neg/utils.go new file mode 100644 index 0000000000..8249c3add7 --- /dev/null +++ b/pkg/neg/utils.go @@ -0,0 +1,92 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package neg + +import ( + "fmt" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/ingress-gce/pkg/annotations" +) + +// PortNameMap is a map of ServicePort:TargetPort. +type PortNameMap map[int32]string + +// Union returns the union of the Port:TargetPort mappings +func (p1 PortNameMap) Union(p2 PortNameMap) PortNameMap { + result := make(PortNameMap) + for svcPort, targetPort := range p1 { + result[svcPort] = targetPort + } + + for svcPort, targetPort := range p2 { + result[svcPort] = targetPort + } + + return result +} + +// Difference returns the set of Port:TargetPorts in p2 that aren't present in p1 +func (p1 PortNameMap) Difference(p2 PortNameMap) PortNameMap { + result := make(PortNameMap) + for svcPort, targetPort := range p1 { + if _, ok := p2[svcPort]; !ok { + result[svcPort] = targetPort + } + } + + return result +} + +// NEGServicePorts returns the parsed ServicePorts from the annotation. +// knownPorts represents the known Port:TargetPort attributes of servicePorts +// that already exist on the service. This function returns an error if +// any of the parsed ServicePorts from the annotation is not in knownPorts. +func NEGServicePorts(exposedNegPortMap annotations.ExposeNegAnnotation, knownPorts PortNameMap) (PortNameMap, error) { + portSet := make(PortNameMap) + var errList []error + for port, _ := range exposedNegPortMap { + // TODO: also validate ServicePorts in the exposed NEG annotation via webhook + if _, ok := knownPorts[port]; !ok { + errList = append(errList, fmt.Errorf("ServicePort %v doesn't exist on Service", port)) + } + portSet[port] = knownPorts[port] + } + + return portSet, utilerrors.NewAggregate(errList) +} + +// NegStatus contains name and zone of the Network Endpoint Group +// resources associated with this service +type NegStatus struct { + // NetworkEndpointGroups returns the mapping between service port and NEG + // resource. key is service port, value is the name of the NEG resource. + NetworkEndpointGroups PortNameMap `json:"network_endpoint_groups,omitempty"` + Zones []string `json:"zones,omitempty"` +} + +// GetNegStatus generates a NegStatus denoting the current NEGs +// associated with the given ports. +// NetworkEndpointGroups is a mapping between ServicePort and NEG name +// Zones is a list of zones where the NEGs exist. +func GetNegStatus(zones []string, portToNegs PortNameMap) NegStatus { + res := NegStatus{} + res.NetworkEndpointGroups = make(PortNameMap) + res.Zones = zones + res.NetworkEndpointGroups = portToNegs + return res +} diff --git a/pkg/neg/utils_test.go b/pkg/neg/utils_test.go new file mode 100644 index 0000000000..a66727769f --- /dev/null +++ b/pkg/neg/utils_test.go @@ -0,0 +1,185 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package neg + +import ( + "fmt" + "reflect" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/ingress-gce/pkg/annotations" +) + +func TestPortNameMapUnion(t *testing.T) { + testcases := []struct { + desc string + p1 PortNameMap + p2 PortNameMap + expectedMap PortNameMap + }{ + { + "empty map union empty map", + PortNameMap{}, + PortNameMap{}, + PortNameMap{}, + }, + { + "empty map union a non-empty map is the non-empty map", + PortNameMap{}, + PortNameMap{80: "namedport", 443: "3000"}, + PortNameMap{80: "namedport", 443: "3000"}, + }, + { + "union of two non-empty maps", + PortNameMap{443: "3000", 5000: "6000"}, + PortNameMap{80: "namedport", 8080: "9000"}, + PortNameMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, + }, + } + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + result := tc.p1.Union(tc.p2) + if !reflect.DeepEqual(result, tc.expectedMap) { + t.Errorf("Expected p1.Union(p2) to equal: %v; got: %v", tc.expectedMap, result) + } + }) + } +} + +func TestPortNameMapDifference(t *testing.T) { + testcases := []struct { + desc string + p1 PortNameMap + p2 PortNameMap + expectedMap PortNameMap + }{ + { + "empty map difference empty map", + PortNameMap{}, + PortNameMap{}, + PortNameMap{}, + }, + { + "empty map difference a non-empty map is empty map", + PortNameMap{}, + PortNameMap{80: "namedport", 443: "3000"}, + PortNameMap{}, + }, + { + "non-empty map difference a non-empty map is the non-empty map", + PortNameMap{80: "namedport", 443: "3000"}, + PortNameMap{}, + PortNameMap{80: "namedport", 443: "3000"}, + }, + { + "difference of two non-empty maps with the same elements", + PortNameMap{80: "namedport", 443: "3000"}, + PortNameMap{80: "namedport", 443: "3000"}, + PortNameMap{}, + }, + { + "difference of two non-empty maps with no elements in common returns p1", + PortNameMap{443: "3000", 5000: "6000"}, + PortNameMap{80: "namedport", 8080: "9000"}, + PortNameMap{443: "3000", 5000: "6000"}, + }, + { + "difference of two non-empty maps with elements in common", + PortNameMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, + PortNameMap{80: "namedport", 8080: "9000"}, + PortNameMap{443: "3000", 5000: "6000"}, + }, + } + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + result := tc.p1.Difference(tc.p2) + if !reflect.DeepEqual(result, tc.expectedMap) { + t.Errorf("Expected p1.Difference(p2) to equal: %v; got: %v", tc.expectedMap, result) + } + }) + } +} + +func TestNEGServicePorts(t *testing.T) { + testcases := []struct { + desc string + annotation string + knownPortMap PortNameMap + expectedPortMap PortNameMap + expectedErr error + }{ + { + desc: "NEG annotation references port that Service does not have", + annotation: `{"3000":{}}`, + expectedErr: utilerrors.NewAggregate([]error{ + fmt.Errorf("ServicePort %v doesn't exist on Service", 3000), + }), + knownPortMap: PortNameMap{80: "some_port", 443: "another_port"}, + expectedPortMap: PortNameMap{3000: ""}, + }, + { + desc: "NEG annotation references existing service ports", + annotation: `{"80":{},"443":{}}`, + knownPortMap: PortNameMap{80: "namedport", 443: "3000"}, + expectedPortMap: PortNameMap{80: "namedport", 443: "3000"}, + }, + + { + desc: "NEGServicePort takes the union of known ports and ports referenced in the annotation", + annotation: `{"80":{}}`, + knownPortMap: PortNameMap{80: "8080", 3000: "3030", 4000: "4040"}, + expectedPortMap: PortNameMap{80: "8080"}, + }, + } + + for _, tc := range testcases { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + } + + if len(tc.annotation) > 0 { + service.Annotations[annotations.ExposeNEGAnnotationKey] = tc.annotation + } + + svc := annotations.FromService(service) + exposeNegStruct, _ := svc.ExposeNegAnnotation() + + t.Run(tc.desc, func(t *testing.T) { + svcPorts, err := NEGServicePorts(exposeNegStruct, tc.knownPortMap) + if tc.expectedErr == nil && err != nil { + t.Errorf("ExpectedNEGServicePorts to not return an error, got: %v", err) + } + + if !reflect.DeepEqual(svcPorts, tc.expectedPortMap) { + t.Errorf("Expected NEGServicePorts to equal: %v; got: %v", tc.expectedPortMap, svcPorts) + } + + if tc.expectedErr != nil { + if !reflect.DeepEqual(err, tc.expectedErr) { + t.Errorf("Expected NEGServicePorts to return a %v error, got: %v", tc.expectedErr, err) + } + } + }) + } +} diff --git a/pkg/utils/namer.go b/pkg/utils/namer.go index e9f7379976..c7f479a2af 100644 --- a/pkg/utils/namer.go +++ b/pkg/utils/namer.go @@ -371,7 +371,7 @@ func (n *Namer) NamedPort(port int64) string { // NEG returns the gce neg name based on the service namespace, name // and target port. NEG naming convention: // -// {prefix}{version}-{clusterid}-{namespace}-{name}-{target port}-{hash} +// {prefix}{version}-{clusterid}-{namespace}-{name}-{service port}-{hash} // // Output name is at most 63 characters. NEG tries to keep as much // information as possible. @@ -379,12 +379,13 @@ func (n *Namer) NamedPort(port int64) string { // WARNING: Controllers depend on the naming pattern to get the list // of all NEGs associated with the current cluster. Any modifications // must be backward compatible. -func (n *Namer) NEG(namespace, name, port string) string { - truncFields := trimFieldsEvenly(maxNEGDescriptiveLabel, namespace, name, port) +func (n *Namer) NEG(namespace, name string, port int32) string { + portStr := fmt.Sprintf("%v", port) + truncFields := trimFieldsEvenly(maxNEGDescriptiveLabel, namespace, name, portStr) truncNamespace := truncFields[0] truncName := truncFields[1] truncPort := truncFields[2] - return fmt.Sprintf("%s-%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, truncPort, negSuffix(n.shortUID(), namespace, name, port)) + return fmt.Sprintf("%s-%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, truncPort, negSuffix(n.shortUID(), namespace, name, portStr)) } // IsNEG returns true if the name is a NEG owned by this cluster. diff --git a/pkg/utils/namer_test.go b/pkg/utils/namer_test.go index a421bd2687..9f8d26693d 100644 --- a/pkg/utils/namer_test.go +++ b/pkg/utils/namer_test.go @@ -390,28 +390,28 @@ func TestNamerNEG(t *testing.T) { desc string namespace string name string - port string + port int32 expect string }{ { "simple case", "namespace", "name", - "80", + 80, "k8s1-01234567-namespace-name-80-5104b449", }, { "63 characters", longstring[:10], longstring[:10], - longstring[:10], - "k8s1-01234567-0123456789-0123456789-0123456789-6d4e657b", + 1234567890, + "k8s1-01234567-0123456789-0123456789-1234567890-ed141b14", }, { "long namespace", longstring, "0", - "0", + 0, "k8s1-01234567-0123456789012345678901234567890123456-0--72142e04", }, @@ -419,15 +419,15 @@ func TestNamerNEG(t *testing.T) { "long name and namespace", longstring, longstring, - "0", + 0, "k8s1-01234567-0123456789012345678-0123456789012345678--9129e3d2", }, { - " long name, namespace and port", + "long name, namespace and port", longstring, longstring[:40], - longstring[:30], - "k8s1-01234567-0123456789012345-0123456789012-012345678-a7dff5e0", + 2147483647, + "k8s1-01234567-0123456789012345678-0123456789012345-214-ed1f2a2f", }, } @@ -444,8 +444,8 @@ func TestNamerNEG(t *testing.T) { // Different prefix. namer = NewNamerWithPrefix("mci", clusterId, "fw") - name := namer.NEG("ns", "svc", "port") - const want = "mci1-01234567-ns-svc-port-fe7dd054" + name := namer.NEG("ns", "svc", 80) + const want = "mci1-01234567-ns-svc-80-4890871b" if name != want { t.Errorf(`with prefix %q, namer.NEG("ns", "svc", 80) = %q, want %q`, "mci", name, want) } diff --git a/pkg/utils/serviceport.go b/pkg/utils/serviceport.go index eca6ab9e7c..ffcb1397c8 100644 --- a/pkg/utils/serviceport.go +++ b/pkg/utils/serviceport.go @@ -33,11 +33,14 @@ type ServicePortID struct { // ServicePort maintains configuration for a single backend. type ServicePort struct { + // Ingress backend-specified service name and port ID ServicePortID - NodePort int64 + NodePort int64 + // Numerical port of the Service, retrieved from the Service + Port int32 Protocol annotations.AppProtocol - SvcTargetPort string + TargetPort string NEGEnabled bool BackendConfig *backendconfigv1beta1.BackendConfig } @@ -63,7 +66,7 @@ func (sp ServicePort) BackendName(namer *Namer) string { return namer.IGBackend(sp.NodePort) } - return namer.NEG(sp.ID.Service.Namespace, sp.ID.Service.Name, sp.SvcTargetPort) + return namer.NEG(sp.ID.Service.Namespace, sp.ID.Service.Name, sp.Port) } // BackendToServicePortID creates a ServicePortID from a given IngressBackend and namespace.