From e01bd7b07ee43ff5e1d195b4a8c4313a6e6d2360 Mon Sep 17 00:00:00 2001 From: mmamczur Date: Fri, 2 Jun 2023 16:57:13 +0200 Subject: [PATCH] NetLB controller support for MultiNIC This commit adds support for multi networked NetLB services. These services will use NEG backends instead of instance groups. --- pkg/l4lb/l4netlbcontroller.go | 90 +++++++++---- pkg/l4lb/l4netlbcontroller_test.go | 199 +++++++++++++++++++++++++++-- pkg/loadbalancers/l4netlb.go | 5 +- pkg/loadbalancers/l4netlbipv6.go | 4 +- 4 files changed, 254 insertions(+), 44 deletions(-) diff --git a/pkg/l4lb/l4netlbcontroller.go b/pkg/l4lb/l4netlbcontroller.go index 148e380eac..bcb99a9e75 100644 --- a/pkg/l4lb/l4netlbcontroller.go +++ b/pkg/l4lb/l4netlbcontroller.go @@ -36,6 +36,7 @@ import ( "k8s.io/ingress-gce/pkg/instancegroups" "k8s.io/ingress-gce/pkg/l4lb/metrics" "k8s.io/ingress-gce/pkg/loadbalancers" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/network" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/common" @@ -49,11 +50,13 @@ const ( ) type L4NetLBController struct { - ctx *context.ControllerContext - svcQueue utils.TaskQueue - serviceLister cache.Indexer - nodeLister listers.NodeLister - stopCh chan struct{} + ctx *context.ControllerContext + svcQueue utils.TaskQueue + serviceLister cache.Indexer + nodeLister listers.NodeLister + networkLister cache.Indexer + gkeNetworkParamSetLister cache.Indexer + stopCh chan struct{} translator *translator.Translator namer namer.L4ResourcesNamer @@ -65,6 +68,7 @@ type L4NetLBController struct { backendPool *backends.Backends instancePool instancegroups.Manager igLinker *backends.RegionalInstanceGroupLinker + negLinker backends.Linker forwardingRules ForwardingRulesGetter enableDualStack bool } @@ -92,6 +96,13 @@ func NewL4NetLBController( forwardingRules: forwardingrules.New(ctx.Cloud, meta.VersionGA, meta.Regional), enableDualStack: ctx.EnableL4NetLBDualStack, } + if ctx.NetworkInformer != nil { + l4netLBc.networkLister = ctx.NetworkInformer.GetIndexer() + } + if ctx.GKENetworkParamsInformer != nil { + l4netLBc.gkeNetworkParamSetLister = ctx.GKENetworkParamsInformer.GetIndexer() + } + l4netLBc.negLinker = backends.NewNEGLinker(l4netLBc.backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud, ctx.SvcNegInformer.GetIndexer()) l4netLBc.svcQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("l4netLB", "services", ctx.NumL4NetLBWorkers, l4netLBc.sync) ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -427,27 +438,32 @@ func (lc *L4NetLBController) sync(key string) error { // syncInternal ensures load balancer resources for the given service, as needed. // Returns an error if processing the service update failed. func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4NetLBSyncResult { - l4NetLBParams := &loadbalancers.L4NetLBParams{ - Service: service, - Cloud: lc.ctx.Cloud, - Namer: lc.namer, - Recorder: lc.ctx.Recorder(service.Namespace), - DualStackEnabled: lc.enableDualStack, - NetworkInfo: *network.DefaultNetwork(lc.ctx.Cloud), - } - l4netlb := loadbalancers.NewL4NetLB(l4NetLBParams) // check again that rbs is enabled. if !lc.isRBSBasedService(service) { klog.Infof("Skipping syncInternal. Service %s does not have RBS enabled", service.Name) return nil } - + networkInfo, err := network.ServiceNetwork(service, lc.networkLister, lc.gkeNetworkParamSetLister, lc.ctx.Cloud, klog.TODO()) + if err != nil { + klog.Errorf("Failed to get network for service %s/%s, err: %v", service.Namespace, service.Name, err) + return &loadbalancers.L4NetLBSyncResult{Error: err} + } startTime := time.Now() klog.Infof("Syncing L4 NetLB RBS service %s/%s", service.Namespace, service.Name) defer func() { klog.Infof("Finished syncing L4 NetLB RBS service %s/%s, time taken: %v", service.Namespace, service.Name, time.Since(startTime)) }() + l4NetLBParams := &loadbalancers.L4NetLBParams{ + Service: service, + Cloud: lc.ctx.Cloud, + Namer: lc.namer, + Recorder: lc.ctx.Recorder(service.Namespace), + DualStackEnabled: lc.enableDualStack, + NetworkInfo: *networkInfo, + } + l4netlb := loadbalancers.NewL4NetLB(l4NetLBParams) + if err := common.EnsureServiceFinalizer(service, common.NetLBFinalizerV2, lc.ctx.KubeClient); err != nil { return &loadbalancers.L4NetLBSyncResult{Error: fmt.Errorf("Failed to attach L4 External LoadBalancer finalizer to service %s/%s, err %w", service.Namespace, service.Name, err)} } @@ -457,10 +473,12 @@ func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4 return &loadbalancers.L4NetLBSyncResult{Error: err} } - if err := lc.ensureInstanceGroups(service, nodeNames); err != nil { - lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncInstanceGroupsFailed", - "Error syncing instance group, err: %v", err) - return &loadbalancers.L4NetLBSyncResult{Error: err} + if networkInfo.IsDefault { + if err := lc.ensureInstanceGroups(service, nodeNames); err != nil { + lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncInstanceGroupsFailed", + "Error syncing instance group, err: %v", err) + return &loadbalancers.L4NetLBSyncResult{Error: err} + } } // Use the same function for both create and updates. If controller crashes and restarts, @@ -472,7 +490,7 @@ func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4 return syncResult } - if err = lc.ensureBackendLinking(service); err != nil { + if err = lc.ensureBackendLinking(service, networkInfo); err != nil { lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncExternalLoadBalancerFailed", "Error linking instance groups to backend service, err: %v", err) syncResult.Error = err @@ -519,12 +537,22 @@ func (lc *L4NetLBController) emitEnsuredDualStackEvent(service *v1.Service) { "Successfully ensured %v External LoadBalancer resources", strings.Join(ipFamilies, " ")) } -func (lc *L4NetLBController) ensureBackendLinking(service *v1.Service) error { +func (lc *L4NetLBController) ensureBackendLinking(service *v1.Service, networkInfo *network.NetworkInfo) error { start := time.Now() - klog.V(2).Infof("Linking backend service with instance groups for service %s/%s", service.Namespace, service.Name) - defer func() { - klog.V(2).Infof("Finished linking backend service with instance groups for service %s/%s, time taken: %v", service.Namespace, service.Name, time.Since(start)) - }() + // NEG backends should only be used for multinetwork services on the non default network. + linkWithNEGs := !networkInfo.IsDefault + + if linkWithNEGs { + klog.V(2).Infof("Linking backend service with NEGs for service %s/%s", service.Namespace, service.Name) + defer func() { + klog.V(2).Infof("Finished linking backend service with NEGs for service %s/%s, time taken: %v", service.Namespace, service.Name, time.Since(start)) + }() + } else { + klog.V(2).Infof("Linking backend service with instance groups for service %s/%s", service.Namespace, service.Name) + defer func() { + klog.V(2).Infof("Finished linking backend service with instance groups for service %s/%s, time taken: %v", service.Namespace, service.Name, time.Since(start)) + }() + } zones, err := lc.translator.ListZones(utils.CandidateNodesPredicate) if err != nil { @@ -538,8 +566,16 @@ func (lc *L4NetLBController) ensureBackendLinking(service *v1.Service) error { BackendNamer: lc.namer, L4RBSEnabled: true, } - - return lc.igLinker.Link(servicePort, lc.ctx.Cloud.ProjectID(), zones) + if !linkWithNEGs { + return lc.igLinker.Link(servicePort, lc.ctx.Cloud.ProjectID(), zones) + } else { + servicePort.VMIPNEGEnabled = true + var groupKeys []backends.GroupKey + for _, zone := range zones { + groupKeys = append(groupKeys, backends.GroupKey{Zone: zone}) + } + return lc.negLinker.Link(servicePort, groupKeys) + } } func (lc *L4NetLBController) ensureInstanceGroups(service *v1.Service, nodeNames []string) error { diff --git a/pkg/l4lb/l4netlbcontroller_test.go b/pkg/l4lb/l4netlbcontroller_test.go index 37227fb612..dfd2ca04d7 100644 --- a/pkg/l4lb/l4netlbcontroller_test.go +++ b/pkg/l4lb/l4netlbcontroller_test.go @@ -30,6 +30,9 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock" + "github.com/google/go-cmp/cmp" + computebeta "google.golang.org/api/compute/v0.beta" + compute "google.golang.org/api/compute/v1" ga "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" v1 "k8s.io/api/core/v1" @@ -38,13 +41,18 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + networkv1 "k8s.io/cloud-provider-gcp/crd/apis/network/v1" + gkenetworkparamsetv1alpha1 "k8s.io/cloud-provider-gcp/crd/apis/network/v1alpha1" + netfake "k8s.io/cloud-provider-gcp/crd/client/network/clientset/versioned/fake" "k8s.io/cloud-provider-gcp/providers/gce" "k8s.io/cloud-provider/service/helpers" "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/composite" ingctx "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/healthchecksl4" "k8s.io/ingress-gce/pkg/loadbalancers" + "k8s.io/ingress-gce/pkg/network" "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/common" @@ -162,18 +170,9 @@ func createAndSyncLegacyNetLBSvc(t *testing.T) (svc *v1.Service, lc *L4NetLBCont } func checkBackendService(lc *L4NetLBController, svc *v1.Service) error { - backendServiceLink, bs, err := getBackend(lc, svc) + bs, err := checkBackendServiceCommon(lc, svc) if err != nil { - return fmt.Errorf("Failed to fetch backend service, err %v", err) - } - if bs.SelfLink != backendServiceLink { - return fmt.Errorf("Backend Service SelfLink mismatch: %s != %s", bs.SelfLink, backendServiceLink) - } - if bs.LoadBalancingScheme != string(cloud.SchemeExternal) { - return fmt.Errorf("Load Balancing Scheme mismatch: EXTERNAL != %s", bs.LoadBalancingScheme) - } - if len(bs.Backends) == 0 { - return fmt.Errorf("Error no backends in BackendService") + return err } igName := lc.namer.InstanceGroup() for _, b := range bs.Backends { @@ -191,6 +190,45 @@ func checkBackendService(lc *L4NetLBController, svc *v1.Service) error { return nil } +func checkBackendServiceWithNEG(lc *L4NetLBController, svc *v1.Service) error { + bs, err := checkBackendServiceCommon(lc, svc) + if err != nil { + return err + } + negName := lc.namer.L4Backend(svc.Namespace, svc.Name) + for _, b := range bs.Backends { + if !strings.Contains(b.Group, negName) { + return fmt.Errorf("Backend NEG Link mismatch: %s != %s", negName, b.Group) + } + } + neg, err := lc.ctx.Cloud.GetNetworkEndpointGroup(negName, testGCEZone) + if err != nil { + return fmt.Errorf("Error getting NEG, err %v", err) + } + if neg == nil { + return fmt.Errorf("NEG does not exist") + } + return nil +} + +// checkBackendServiceCommon verifies attributes common to InstanceGroup and NEG backed BackendServices. +func checkBackendServiceCommon(lc *L4NetLBController, svc *v1.Service) (*composite.BackendService, error) { + backendServiceLink, bs, err := getBackend(lc, svc) + if err != nil { + return nil, fmt.Errorf("Failed to fetch backend service, err %v", err) + } + if bs.SelfLink != backendServiceLink { + return nil, fmt.Errorf("Backend Service SelfLink mismatch: %s != %s", bs.SelfLink, backendServiceLink) + } + if bs.LoadBalancingScheme != string(cloud.SchemeExternal) { + return nil, fmt.Errorf("Load Balancing Scheme mismatch: EXTERNAL != %s", bs.LoadBalancingScheme) + } + if len(bs.Backends) == 0 { + return nil, fmt.Errorf("Error no backends in BackendService") + } + return bs, nil +} + func updateRegionBackendServiceWithLockHook(ctx context.Context, key *meta.Key, obj *ga.BackendService, m *cloud.MockRegionBackendServices) error { _, err := m.Get(ctx, key) if err != nil { @@ -224,6 +262,8 @@ func getFakeGCECloud(vals gce.TestClusterValues) *gce.Cloud { func buildContext(vals gce.TestClusterValues) *ingctx.ControllerContext { fakeGCE := getFakeGCECloud(vals) kubeClient := fake.NewSimpleClientset() + networkClient := netfake.NewSimpleClientset() + namer := namer.NewNamer(clusterUID, "") ctxConfig := ingctx.ControllerContextConfig{ @@ -232,7 +272,7 @@ func buildContext(vals gce.TestClusterValues) *ingctx.ControllerContext { NumL4NetLBWorkers: 5, MaxIGSize: 1000, } - return ingctx.NewControllerContext(nil, kubeClient, nil, nil, nil, nil, nil, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig) + return ingctx.NewControllerContext(nil, kubeClient, nil, nil, nil, nil, nil, nil, networkClient, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig) } func newL4NetLBServiceController() *L4NetLBController { @@ -438,6 +478,76 @@ func TestProcessServiceCreate(t *testing.T) { deleteNetLBService(lc, svc) } +func TestProcessMultinetServiceCreate(t *testing.T) { + lc := newL4NetLBServiceController() + + net := &networkv1.Network{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secondary-network", + }, + Spec: networkv1.NetworkSpec{ + Type: "L3", + ParametersRef: &networkv1.NetworkParametersReference{ + Group: gkenetworkparamsetv1alpha1.GroupName, + Kind: "GKENetworkParamSet", + Name: "secondary-network-params", + }, + }, + } + lc.networkLister.Add(net) + gkeParamSet := &gkenetworkparamsetv1alpha1.GKENetworkParamSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secondary-network-params", + }, + Spec: gkenetworkparamsetv1alpha1.GKENetworkParamSetSpec{ + VPC: "vpc", + VPCSubnet: "subnet", + }, + } + lc.gkeNetworkParamSetLister.Add(gkeParamSet) + + svc := test.NewL4NetLBRBSService(8080) + // create the NEG that would be created by the NEG controller. + neg := &computebeta.NetworkEndpointGroup{ + Name: lc.namer.L4Backend(svc.Namespace, svc.Name), + } + lc.ctx.Cloud.CreateNetworkEndpointGroup(neg, "us-central1-b") + + svc.Spec.Selector = make(map[string]string) + svc.Spec.Selector[networkv1.NetworkAnnotationKey] = "secondary-network" + addNetLBService(lc, svc) + prevMetrics, err := test.GetL4NetLBLatencyMetric() + if err != nil { + t.Errorf("Error getting L4 NetLB latency metrics err: %v", err) + } + if prevMetrics == nil { + t.Fatalf("Cannot get prometheus metrics for L4NetLB latency") + } + key, _ := common.KeyFunc(svc) + err = lc.sync(key) + if err != nil { + t.Errorf("Failed to sync newly added service %s, err %v", svc.Name, err) + } + svc, err = lc.ctx.KubeClient.CoreV1().Services(svc.Namespace).Get(context.TODO(), svc.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Failed to lookup service %s, err %v", svc.Name, err) + } + currMetrics, metricErr := test.GetL4NetLBLatencyMetric() + if metricErr != nil { + t.Errorf("Error getting L4 NetLB latency metrics err: %v", metricErr) + } + prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t) + + validateNetLBSvcStatus(svc, t) + if err := checkBackendServiceWithNEG(lc, svc); err != nil { + t.Errorf("UnexpectedError %v", err) + } + if err := validateAnnotations(svc); err != nil { + t.Errorf("%v", err) + } + deleteNetLBService(lc, svc) +} + func TestProcessServiceCreateWithUsersProvidedIP(t *testing.T) { lc := newL4NetLBServiceController() @@ -1494,3 +1604,68 @@ func TestCreateDeleteDualStackNetLBService(t *testing.T) { }) } } + +// fakeNEGLinker is a fake to be used in tests in place of the NEGLinker. +type fakeNEGLinker struct { + called bool + sp utils.ServicePort + groups []backends.GroupKey +} + +func (l *fakeNEGLinker) Link(sp utils.ServicePort, groups []backends.GroupKey) error { + l.called = true + l.sp = sp + l.groups = groups + return nil +} + +func TestEnsureBackendLinkingWithNEGs(t *testing.T) { + controller := newL4NetLBServiceController() + linker := &fakeNEGLinker{} + controller.negLinker = linker + svc := test.NewL4NetLBRBSService(8080) + + networkInfo := &network.NetworkInfo{IsDefault: false} + err := controller.ensureBackendLinking(svc, networkInfo) + if err != nil { + t.Fatalf("ensureBackendLinking() failed, err=%v", err) + } + namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + spID := utils.ServicePortID{Service: namespacedName} + + if diff := cmp.Diff(linker.sp.ID, spID); diff != "" { + t.Errorf("ServicePort.ID mismatch (-want +got):\n%s", diff) + } + + if !linker.sp.L4RBSEnabled { + t.Errorf("RBS was not enabled in the Service Port, got=%+v", linker.sp) + } + + if !linker.sp.VMIPNEGEnabled { + t.Errorf("VMIPNEGEnabled was not enabled in the Service Port, got=%+v", linker.sp) + } +} + +func TestEnsureBackendLinkingWithInstanceGroups(t *testing.T) { + controller := newL4NetLBServiceController() + svc := test.NewL4NetLBRBSService(8080) + // set the fake NEG linker just to verify that it's not used in this scenario. + negLinker := &fakeNEGLinker{} + controller.negLinker = negLinker + + backendService := &compute.BackendService{Name: controller.namer.L4Backend(svc.Namespace, svc.Name)} + err := controller.ctx.Cloud.CreateRegionBackendService(backendService, "us-central1") + if err != nil { + t.Fatalf("CreateRegionBackendService() failed, err=%v", err) + } + + networkInfo := &network.NetworkInfo{IsDefault: true} + err = controller.ensureBackendLinking(svc, networkInfo) + if err != nil { + t.Fatalf("ensureBackendLinking() failed, err=%v", err) + } + + if negLinker.called { + t.Errorf("IG linking should not use NEG linker") + } +} diff --git a/pkg/loadbalancers/l4netlb.go b/pkg/loadbalancers/l4netlb.go index 88a153ab98..4283fef7b7 100644 --- a/pkg/loadbalancers/l4netlb.go +++ b/pkg/loadbalancers/l4netlb.go @@ -53,6 +53,7 @@ type L4NetLB struct { healthChecks healthchecksl4.L4HealthChecks forwardingRules ForwardingRulesProvider enableDualStack bool + networkInfo network.NetworkInfo } // L4NetLBSyncResult contains information about the outcome of an L4 NetLB sync. It stores the list of resource name annotations, @@ -109,6 +110,7 @@ func NewL4NetLB(params *L4NetLBParams) *L4NetLB { healthChecks: healthchecksl4.NewL4HealthChecks(params.Cloud, params.Recorder, params.NetworkInfo), forwardingRules: forwardingrules.New(params.Cloud, meta.VersionGA, meta.Regional), enableDualStack: params.DualStackEnabled, + networkInfo: params.NetworkInfo, } return l4netlb } @@ -266,7 +268,6 @@ func (l4netlb *L4NetLB) ensureIPv4NodesFirewall(nodeNames []string, ipAddress st } // Add firewall rule for L4 External LoadBalancer traffic to nodes - defaultNetwork := network.DefaultNetwork(l4netlb.cloud) nodesFWRParams := firewalls.FirewallParams{ PortRanges: portRanges, SourceRanges: sourceRanges, @@ -275,7 +276,7 @@ func (l4netlb *L4NetLB) ensureIPv4NodesFirewall(nodeNames []string, ipAddress st Name: firewallName, IP: l4netlb.Service.Spec.LoadBalancerIP, NodeNames: nodeNames, - Network: *defaultNetwork, + Network: l4netlb.networkInfo, } result.Error = firewalls.EnsureL4LBFirewallForNodes(l4netlb.Service, &nodesFWRParams, l4netlb.cloud, l4netlb.recorder) if result.Error != nil { diff --git a/pkg/loadbalancers/l4netlbipv6.go b/pkg/loadbalancers/l4netlbipv6.go index 4dd1c8bbea..3308556647 100644 --- a/pkg/loadbalancers/l4netlbipv6.go +++ b/pkg/loadbalancers/l4netlbipv6.go @@ -17,7 +17,6 @@ limitations under the License. package loadbalancers import ( - "k8s.io/ingress-gce/pkg/network" "strings" "time" @@ -123,7 +122,6 @@ func (l4netlb *L4NetLB) ensureIPv6NodesFirewall(ipAddress string, nodeNames []st return } - defaultNetwork := network.DefaultNetwork(l4netlb.cloud) ipv6nodesFWRParams := firewalls.FirewallParams{ PortRanges: portRanges, SourceRanges: ipv6SourceRanges, @@ -132,7 +130,7 @@ func (l4netlb *L4NetLB) ensureIPv6NodesFirewall(ipAddress string, nodeNames []st Name: firewallName, NodeNames: nodeNames, L4Type: utils.XLB, - Network: *defaultNetwork, + Network: l4netlb.networkInfo, } err = firewalls.EnsureL4LBFirewallForNodes(l4netlb.Service, &ipv6nodesFWRParams, l4netlb.cloud, l4netlb.recorder)