Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
panslava committed Aug 16, 2022
1 parent b889888 commit e6d94b6
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pkg/forwardingrules/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ForwardingRules struct {
scope meta.KeyType
}

func NewForwardingRules(cloud *gce.Cloud, version meta.Version, scope meta.KeyType) *ForwardingRules {
func New(cloud *gce.Cloud, version meta.Version, scope meta.KeyType) *ForwardingRules {
return &ForwardingRules{
cloud: cloud,
version: version,
Expand Down
87 changes: 49 additions & 38 deletions pkg/forwardingrules/forwarding_rules_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package forwardingrules

import (
"fmt"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
Expand All @@ -19,16 +20,14 @@ func TestCreateForwardingRule(t *testing.T) {
}{
{
frRule: &composite.ForwardingRule{
Name: "elb",
Description: "elb description",
Name: "NetLB",
LoadBalancingScheme: string(cloud.SchemeExternal),
},
desc: "Test creating external forwarding rule",
},
{
frRule: &composite.ForwardingRule{
Name: "ilb",
Description: "ilb description",
Name: "ILB",
LoadBalancingScheme: string(cloud.SchemeInternal),
},
desc: "Test creating internal forwarding rule",
Expand All @@ -38,27 +37,30 @@ func TestCreateForwardingRule(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
frc := NewForwardingRules(fakeGCE, meta.VersionGA, meta.Regional)
frc := New(fakeGCE, meta.VersionGA, meta.Regional)

err := frc.Create(tc.frRule)
if err != nil {
t.Fatalf("frc.Create(%v), returned error %v, want nil", tc.frRule, err)
}

verifyForwardingRuleExists(t, fakeGCE, tc.frRule.Name)
err = verifyForwardingRuleExists(fakeGCE, tc.frRule.Name)
if err != nil {
t.Errorf("verifyForwardingRuleExists(_, %s) returned error %v, want nil", tc.frRule.Name, err)
}
})
}
}

func TestGetForwardingRule(t *testing.T) {
elbForwardingRule := &composite.ForwardingRule{
Name: "elb",
Name: "NetLB",
Version: meta.VersionGA,
Scope: meta.Regional,
LoadBalancingScheme: string(cloud.SchemeExternal),
}
ilbForwardingRule := &composite.ForwardingRule{
Name: "ilb",
Name: "ILB",
Version: meta.VersionGA,
Scope: meta.Regional,
LoadBalancingScheme: string(cloud.SchemeInternal),
Expand Down Expand Up @@ -93,10 +95,8 @@ func TestGetForwardingRule(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
for _, fr := range tc.existingFwdRules {
mustCreateForwardingRule(t, fakeGCE, fr)
}
frc := NewForwardingRules(fakeGCE, meta.VersionGA, meta.Regional)
frc := New(fakeGCE, meta.VersionGA, meta.Regional)
mustCreateForwardingRules(t, fakeGCE, tc.existingFwdRules)

fr, err := frc.Get(tc.getFwdRuleName)
if err != nil {
Expand All @@ -114,11 +114,11 @@ func TestGetForwardingRule(t *testing.T) {

func TestDeleteForwardingRule(t *testing.T) {
elbForwardingRule := &composite.ForwardingRule{
Name: "elb",
Name: "NetLB",
LoadBalancingScheme: string(cloud.SchemeExternal),
}
ilbForwardingRule := &composite.ForwardingRule{
Name: "ilb",
Name: "ILB",
LoadBalancingScheme: string(cloud.SchemeInternal),
}

Expand Down Expand Up @@ -157,53 +157,64 @@ func TestDeleteForwardingRule(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
for _, fr := range tc.existingFwdRules {
mustCreateForwardingRule(t, fakeGCE, fr)
}
frc := NewForwardingRules(fakeGCE, meta.VersionGA, meta.Regional)
frc := New(fakeGCE, meta.VersionGA, meta.Regional)
mustCreateForwardingRules(t, fakeGCE, tc.existingFwdRules)

err := frc.Delete(tc.deleteFwdRuleName)
if err != nil {
t.Fatalf("frc.Delete(%v), returned error %v, want nil", tc.deleteFwdRuleName, err)
}

verifyForwardingRuleNotExists(t, fakeGCE, tc.deleteFwdRuleName)
err = verifyForwardingRuleNotExists(fakeGCE, tc.deleteFwdRuleName)
if err != nil {
t.Errorf("verifyForwardingRuleNotExists(_, %s) returned error %v, want nil", tc.deleteFwdRuleName, err)
}
for _, fw := range tc.shouldExistFwdRules {
verifyForwardingRuleExists(t, fakeGCE, fw.Name)
err = verifyForwardingRuleExists(fakeGCE, fw.Name)
if err != nil {
t.Errorf("verifyForwardingRuleExists(_, %s) returned error %v, want nil", fw.Name, err)
}
}
})
}
}

func verifyForwardingRuleExists(t *testing.T, cloud *gce.Cloud, name string) {
t.Helper()
verifyForwardingRuleShouldExist(t, cloud, name, true)
}
func verifyForwardingRuleExists(cloud *gce.Cloud, name string) error {
key, err := composite.CreateKey(cloud, name, meta.Regional)
if err != nil {
return fmt.Errorf("failed to create key for fetching forwarding rule %s, err: %w", name, err)
}

func verifyForwardingRuleNotExists(t *testing.T, cloud *gce.Cloud, name string) {
t.Helper()
verifyForwardingRuleShouldExist(t, cloud, name, false)
_, err = composite.GetForwardingRule(cloud, key, meta.VersionGA)
if err != nil {
if utils.IsNotFoundError(err) {
return fmt.Errorf("forwarding rule %s was not found, expected to exist", name)
}
return fmt.Errorf("composite.GetForwardingRule(_, %v, %v) returned error %w, want nil", key, meta.VersionGA, err)
}
return nil
}

func verifyForwardingRuleShouldExist(t *testing.T, cloud *gce.Cloud, name string, shouldExist bool) {
t.Helper()

func verifyForwardingRuleNotExists(cloud *gce.Cloud, name string) error {
key, err := composite.CreateKey(cloud, name, meta.Regional)
if err != nil {
t.Fatalf("Failed to create key for fetching forwarding rule %s, err: %v", name, err)
return fmt.Errorf("failed to create key for fetching forwarding rule %s, err: %w", name, err)
}

_, err = composite.GetForwardingRule(cloud, key, meta.VersionGA)
if err != nil {
if utils.IsNotFoundError(err) {
if shouldExist {
t.Errorf("Forwarding rule %s was not found, expected to exist", name)
}
return
return nil
}
t.Fatalf("composite.GetForwardingRule(_, %v, %v) returned error %v, want nil", key, meta.VersionGA, err)
return fmt.Errorf("composite.GetForwardingRule(_, %v, %v) returned error %w, want nil", key, meta.VersionGA, err)
}
if !shouldExist {
t.Errorf("Forwarding rule %s exists, expected to be not found", name)
return fmt.Errorf("forwarding rule %s exists, expected to be not found", name)
}

func mustCreateForwardingRules(t *testing.T, cloud *gce.Cloud, frs []*composite.ForwardingRule) {
t.Helper()
for _, fr := range frs {
mustCreateForwardingRule(t, cloud, fr)
}
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/forwardingrules/interfaces.go

This file was deleted.

8 changes: 8 additions & 0 deletions pkg/l4lb/intefaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package l4lb

import "k8s.io/ingress-gce/pkg/composite"

// ForwardingRulesGetter is an interface which allows getting Google Cloud Forwarding Rules
type ForwardingRulesGetter interface {
Get(name string) (*composite.ForwardingRule, error)
}
27 changes: 15 additions & 12 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/forwardingrules"
l4metrics "k8s.io/ingress-gce/pkg/l4lb/metrics"
"k8s.io/ingress-gce/pkg/loadbalancers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
Expand Down Expand Up @@ -68,6 +69,7 @@ type L4Controller struct {
enqueueTracker utils.TimeTracker
// syncTracker tracks the latest time an enqueued service was synced
syncTracker utils.TimeTracker
forwardingRules ForwardingRulesGetter
sharedResourcesLock sync.Mutex
}

Expand All @@ -78,14 +80,15 @@ func NewILBController(ctx *context.ControllerContext, stopCh chan struct{}) *L4C
ctx.NumL4Workers = 1
}
l4c := &L4Controller{
ctx: ctx,
client: ctx.KubeClient,
serviceLister: ctx.ServiceInformer.GetIndexer(),
nodeLister: listers.NewNodeLister(ctx.NodeInformer.GetIndexer()),
stopCh: stopCh,
numWorkers: ctx.NumL4Workers,
namer: ctx.L4Namer,
translator: ctx.Translator,
ctx: ctx,
client: ctx.KubeClient,
serviceLister: ctx.ServiceInformer.GetIndexer(),
nodeLister: listers.NewNodeLister(ctx.NodeInformer.GetIndexer()),
stopCh: stopCh,
numWorkers: ctx.NumL4Workers,
namer: ctx.L4Namer,
translator: ctx.Translator,
forwardingRules: forwardingrules.New(ctx.Cloud, meta.VersionGA, meta.Regional),
}
l4c.backendPool = backends.NewPool(ctx.Cloud, l4c.namer)
l4c.NegLinker = backends.NewNEGLinker(l4c.backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud, ctx.SvcNegInformer.GetIndexer())
Expand Down Expand Up @@ -169,7 +172,7 @@ func (l4c *L4Controller) shutdown() {
// If the service has either the v1 finalizer or the forwarding rule created by v1 implementation(service controller),
// the subsetting controller will not process it. Processing it will fail forwarding rule creation with the same IP anyway.
// This check prevents processing of v1-implemented services whose finalizer field got wiped out.
func (l4c *L4Controller) shouldProcessService(service *v1.Service, l4 *loadbalancers.L4) bool {
func (l4c *L4Controller) shouldProcessService(service *v1.Service) bool {
// skip services that are being handled by the legacy service controller.
if utils.IsLegacyL4ILBService(service) {
klog.Warningf("Ignoring update for service %s:%s managed by service controller", service.Namespace, service.Name)
Expand All @@ -178,7 +181,7 @@ func (l4c *L4Controller) shouldProcessService(service *v1.Service, l4 *loadbalan
frName := utils.LegacyForwardingRuleName(service)
// Processing should continue if an external forwarding rule exists. This can happen if the service is transitioning from External to Internal.
// The external forwarding rule might not be deleted by the time this controller starts processing the service.
fr, err := l4.GetForwardingRule(frName)
fr, err := l4c.forwardingRules.Get(frName)
if err != nil {
klog.Errorf("Error getting l4 forwarding rule %s", frName)
}
Expand All @@ -192,8 +195,7 @@ func (l4c *L4Controller) shouldProcessService(service *v1.Service, l4 *loadbalan
// processServiceCreateOrUpdate ensures load balancer resources for the given service, as needed.
// Returns an error if processing the service update failed.
func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Service) *loadbalancers.L4ILBSyncResult {
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace))
if !l4c.shouldProcessService(service, l4) {
if !l4c.shouldProcessService(service) {
return nil
}
// Ensure v2 finalizer
Expand All @@ -206,6 +208,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}
// Use the same function for both create and updates. If controller crashes and restarts,
// all existing services will show up as Service Adds.
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace))
syncResult := l4.EnsureInternalLoadBalancer(nodeNames, service)
// syncResult will not be nil
if syncResult.Error != nil {
Expand Down
37 changes: 19 additions & 18 deletions pkg/l4lb/l4netlbcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/forwardingrules"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/l4lb/metrics"
"k8s.io/ingress-gce/pkg/loadbalancers"
Expand All @@ -55,9 +56,10 @@ type L4NetLBController struct {
// syncTracker tracks the latest time an enqueued service was synced
syncTracker utils.TimeTracker

backendPool *backends.Backends
instancePool instances.NodePool
igLinker *backends.RegionalInstanceGroupLinker
backendPool *backends.Backends
instancePool instances.NodePool
igLinker *backends.RegionalInstanceGroupLinker
forwardingRules ForwardingRulesGetter
}

// NewL4NetLBController creates a controller for l4 external loadbalancer.
Expand All @@ -71,15 +73,16 @@ func NewL4NetLBController(

backendPool := backends.NewPool(ctx.Cloud, ctx.L4Namer)
l4netLBc := &L4NetLBController{
ctx: ctx,
serviceLister: ctx.ServiceInformer.GetIndexer(),
nodeLister: listers.NewNodeLister(ctx.NodeInformer.GetIndexer()),
stopCh: stopCh,
translator: ctx.Translator,
backendPool: backendPool,
namer: ctx.L4Namer,
instancePool: ctx.InstancePool,
igLinker: backends.NewRegionalInstanceGroupLinker(ctx.InstancePool, backendPool),
ctx: ctx,
serviceLister: ctx.ServiceInformer.GetIndexer(),
nodeLister: listers.NewNodeLister(ctx.NodeInformer.GetIndexer()),
stopCh: stopCh,
translator: ctx.Translator,
backendPool: backendPool,
namer: ctx.L4Namer,
instancePool: ctx.InstancePool,
igLinker: backends.NewRegionalInstanceGroupLinker(ctx.InstancePool, backendPool),
forwardingRules: forwardingrules.New(ctx.Cloud, meta.VersionGA, meta.Regional),
}
l4netLBc.svcQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("l4netLB", "services", ctx.NumL4Workers, l4netLBc.sync)

Expand Down Expand Up @@ -270,13 +273,12 @@ func (lc *L4NetLBController) preventLegacyServiceHandling(service *v1.Service, k
}

func (lc *L4NetLBController) hasTargetPoolForwardingRule(service *v1.Service) bool {
l4netlb := loadbalancers.NewL4NetLB(service, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(service.Namespace))
frName := l4netlb.GetFRName()
frName := utils.LegacyForwardingRuleName(service)
if lc.hasForwardingRuleAnnotation(service, frName) {
return false
}

existingFR, err := l4netlb.GetForwardingRule(frName)
existingFR, err := lc.forwardingRules.Get(frName)
if err != nil {
klog.Errorf("Error getting forwarding rule %s", frName)
return false
Expand Down Expand Up @@ -321,13 +323,12 @@ func (lc *L4NetLBController) deleteRBSAnnotation(service *v1.Service) error {

// hasRBSForwardingRule checks if services loadbalancer has forwarding rule pointing to backend service
func (lc *L4NetLBController) hasRBSForwardingRule(svc *v1.Service) bool {
l4netlb := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace))
frName := l4netlb.GetFRName()
frName := utils.LegacyForwardingRuleName(svc)
// to optimize number of api calls, at first, check if forwarding rule exists in annotation
if lc.hasForwardingRuleAnnotation(svc, frName) {
return true
}
existingFR, err := l4netlb.GetForwardingRule(frName)
existingFR, err := lc.forwardingRules.Get(frName)
if err != nil {
klog.Errorf("Error getting forwarding rule %s", frName)
return false
Expand Down
Loading

0 comments on commit e6d94b6

Please sign in to comment.