diff --git a/ingress/controllers/gce/Makefile b/ingress/controllers/gce/Makefile index 979b3779e8..f01f583029 100644 --- a/ingress/controllers/gce/Makefile +++ b/ingress/controllers/gce/Makefile @@ -1,7 +1,7 @@ all: push # 0.0 shouldn't clobber any released builds -TAG = 0.6.0 +TAG = 0.6.1 PREFIX = gcr.io/google_containers/glbc server: diff --git a/ingress/controllers/gce/backends/backends.go b/ingress/controllers/gce/backends/backends.go index 7c5076a16e..a47190cc51 100644 --- a/ingress/controllers/gce/backends/backends.go +++ b/ingress/controllers/gce/backends/backends.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "strconv" + "time" "k8s.io/kubernetes/pkg/util/sets" @@ -38,6 +39,10 @@ type Backends struct { healthChecker healthchecks.HealthChecker snapshotter storage.Snapshotter namer utils.Namer + // ignoredPorts are a set of ports excluded from GC, even + // after the Ingress has been deleted. Note that invoking + // a Delete() on these ports will still delete the backend. + ignoredPorts sets.String } func portKey(port int64) string { @@ -46,18 +51,50 @@ func portKey(port int64) string { // NewBackendPool returns a new backend pool. // - cloud: implements BackendServices and syncs backends with a cloud provider +// - healthChecker: is capable of producing health checks for backends. // - nodePool: implements NodePool, used to create/delete new instance groups. +// - namer: procudes names for backends. +// - ignorePorts: is a set of ports to avoid syncing/GCing. +// - resyncWithCloud: if true, periodically syncs with cloud resources. func NewBackendPool( cloud BackendServices, healthChecker healthchecks.HealthChecker, - nodePool instances.NodePool, namer utils.Namer) *Backends { - return &Backends{ + nodePool instances.NodePool, + namer utils.Namer, + ignorePorts []int64, + resyncWithCloud bool) *Backends { + + ignored := []string{} + for _, p := range ignorePorts { + ignored = append(ignored, portKey(p)) + } + backendPool := &Backends{ cloud: cloud, nodePool: nodePool, - snapshotter: storage.NewInMemoryPool(), healthChecker: healthChecker, namer: namer, + ignoredPorts: sets.NewString(ignored...), } + if !resyncWithCloud { + backendPool.snapshotter = storage.NewInMemoryPool() + return backendPool + } + backendPool.snapshotter = storage.NewCloudListingPool( + func(i interface{}) (string, error) { + bs := i.(*compute.BackendService) + if !namer.NameBelongsToCluster(bs.Name) { + return "", fmt.Errorf("Unrecognized name %v", bs.Name) + } + port, err := namer.BePort(bs.Name) + if err != nil { + return "", err + } + return port, nil + }, + backendPool, + 30*time.Second, + ) + return backendPool } // Get returns a single backend. @@ -150,10 +187,18 @@ func (b *Backends) Delete(port int64) (err error) { } // List lists all backends. -func (b *Backends) List() (*compute.BackendServiceList, error) { +func (b *Backends) List() ([]interface{}, error) { // TODO: for consistency with the rest of this sub-package this method // should return a list of backend ports. - return b.cloud.ListBackendServices() + interList := []interface{}{} + be, err := b.cloud.ListBackendServices() + if err != nil { + return interList, err + } + for i := range be.Items { + interList = append(interList, be.Items[i]) + } + return interList, nil } // edgeHop checks the links of the given backend by executing an edge hop. @@ -200,7 +245,7 @@ func (b *Backends) GC(svcNodePorts []int64) error { return err } nodePort := int64(p) - if knownPorts.Has(portKey(nodePort)) { + if knownPorts.Has(portKey(nodePort)) || b.ignoredPorts.Has(portKey(nodePort)) { continue } glog.V(3).Infof("GCing backend for port %v", p) diff --git a/ingress/controllers/gce/backends/backends_test.go b/ingress/controllers/gce/backends/backends_test.go index 6461ec68f9..f8f74e2d26 100644 --- a/ingress/controllers/gce/backends/backends_test.go +++ b/ingress/controllers/gce/backends/backends_test.go @@ -19,24 +19,26 @@ package backends import ( "testing" + compute "google.golang.org/api/compute/v1" "k8s.io/contrib/ingress/controllers/gce/healthchecks" "k8s.io/contrib/ingress/controllers/gce/instances" + "k8s.io/contrib/ingress/controllers/gce/storage" "k8s.io/contrib/ingress/controllers/gce/utils" "k8s.io/kubernetes/pkg/util/sets" ) -func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups) BackendPool { +func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { namer := utils.Namer{} return NewBackendPool( f, healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer), - instances.NewNodePool(fakeIGs, "default-zone"), namer) + instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud) } func TestBackendPoolAdd(t *testing.T) { f := NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - pool := newBackendPool(f, fakeIGs) + pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} // Add a backend for a port, then re-add the same port and @@ -89,13 +91,12 @@ func TestBackendPoolAdd(t *testing.T) { } func TestBackendPoolSync(t *testing.T) { - // Call sync on a backend pool with a list of ports, make sure the pool // creates/deletes required ports. svcNodePorts := []int64{81, 82, 83} f := NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - pool := newBackendPool(f, fakeIGs) + pool := newBackendPool(f, fakeIGs, true) pool.Add(81) pool.Add(90) pool.Sync(svcNodePorts) @@ -109,12 +110,57 @@ func TestBackendPoolSync(t *testing.T) { } } + svcNodePorts = []int64{81} + deletedPorts := []int64{82, 83} + pool.GC(svcNodePorts) + for _, port := range deletedPorts { + if _, err := pool.Get(port); err == nil { + t.Fatalf("Pool contains %v after deletion", port) + } + } + + // All these backends should be ignored because they don't belong to the cluster. + // foo - non k8s managed backend + // k8s-be-foo - foo is not a nodeport + // k8s--bar--foo - too many cluster delimiters + // k8s-be-3001--uid - another cluster tagged with uid + unrelatedBackends := sets.NewString([]string{"foo", "k8s-be-foo", "k8s--bar--foo", "k8s-be-30001--uid"}...) + for _, name := range unrelatedBackends.List() { + f.CreateBackendService(&compute.BackendService{Name: name}) + } + + namer := &utils.Namer{} + // This backend should get deleted again since it is managed by this cluster. + f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])}) + + // TODO: Avoid casting. + // Repopulate the pool with a cloud list, which now includes the 82 port + // backend. This would happen if, say, an ingress backend is removed + // while the controller is restarting. + pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplenishPool() + + pool.GC(svcNodePorts) + + currBackends, _ := f.ListBackendServices() + currSet := sets.NewString() + for _, b := range currBackends.Items { + currSet.Insert(b.Name) + } + // Port 81 still exists because it's an in-use service NodePort. + knownBe := namer.BeName(81) + if !currSet.Has(knownBe) { + t.Fatalf("Expected %v to exist in backend pool", knownBe) + } + currSet.Delete(knownBe) + if !currSet.Equal(unrelatedBackends) { + t.Fatalf("Some unrelated backends were deleted. Expected %+v, got %+v", unrelatedBackends, currSet) + } } func TestBackendPoolShutdown(t *testing.T) { f := NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - pool := newBackendPool(f, fakeIGs) + pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} pool.Add(80) diff --git a/ingress/controllers/gce/backends/interfaces.go b/ingress/controllers/gce/backends/interfaces.go index 0030092ee0..3e199f9e55 100644 --- a/ingress/controllers/gce/backends/interfaces.go +++ b/ingress/controllers/gce/backends/interfaces.go @@ -30,7 +30,7 @@ type BackendPool interface { GC(ports []int64) error Shutdown() error Status(name string) string - List() (*compute.BackendServiceList, error) + List() ([]interface{}, error) } // BackendServices is an interface for managing gce backend services. diff --git a/ingress/controllers/gce/controller/cluster_manager.go b/ingress/controllers/gce/controller/cluster_manager.go index 076f74cff4..642c3fd226 100644 --- a/ingress/controllers/gce/controller/cluster_manager.go +++ b/ingress/controllers/gce/controller/cluster_manager.go @@ -18,6 +18,8 @@ package controller import ( "fmt" + "net/http" + "time" "k8s.io/contrib/ingress/controllers/gce/backends" "k8s.io/contrib/ingress/controllers/gce/healthchecks" @@ -26,6 +28,8 @@ import ( "k8s.io/contrib/ingress/controllers/gce/utils" "k8s.io/kubernetes/pkg/cloudprovider" gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + + "github.com/golang/glog" ) const ( @@ -54,6 +58,9 @@ const ( // Names longer than this are truncated, because of GCE restrictions. nameLenLimit = 62 + + // Sleep interval to retry cloud client creation. + cloudClientRetryInterval = 10 * time.Second ) // ClusterManager manages cluster resource pools. @@ -70,6 +77,14 @@ func (c *ClusterManager) IsHealthy() (err error) { // TODO: Expand on this, for now we just want to detect when the GCE client // is broken. _, err = c.backendPool.List() + + // If this container is scheduled on a node without compute/rw it is + // effectively useless, but it is healthy. Reporting it as unhealthy + // will lead to container crashlooping. + if utils.IsHTTPErrorCode(err, http.StatusForbidden) { + glog.Infof("Reporting cluster as healthy, but unable to list backends: %v", err) + return nil + } return } @@ -138,6 +153,32 @@ func defaultInstanceGroupName(clusterName string) string { return fmt.Sprintf("%v-%v", instanceGroupPrefix, clusterName) } +func getGCEClient() *gce.GCECloud { + // Creating the cloud interface involves resolving the metadata server to get + // an oauth token. If this fails, the token provider assumes it's not on GCE. + // No errors are thrown. So we need to keep retrying till it works because + // we know we're on GCE. + for { + cloudInterface, err := cloudprovider.GetCloudProvider("gce", nil) + if err == nil { + cloud := cloudInterface.(*gce.GCECloud) + + // If this controller is scheduled on a node without compute/rw + // it won't be allowed to list backends. We can assume that the + // user has no need for Ingress in this case. If they grant + // permissions to the node they will have to restart the controller + // manually to re-create the client. + if _, err = cloud.ListBackendServices(); err == nil || utils.IsHTTPErrorCode(err, http.StatusForbidden) { + return cloud + } + glog.Warningf("Failed to list backend services, retrying: %v", err) + } else { + glog.Warningf("Failed to retrieve cloud interface, retrying: %v", err) + } + time.Sleep(cloudClientRetryInterval) + } +} + // NewClusterManager creates a cluster manager for shared resources. // - name: is the name used to tag cluster wide shared resources. This is the // string passed to glbc via --gce-cluster-name. @@ -149,11 +190,12 @@ func NewClusterManager( defaultBackendNodePort int64, defaultHealthCheckPath string) (*ClusterManager, error) { - cloudInterface, err := cloudprovider.GetCloudProvider("gce", nil) - if err != nil { - return nil, err - } - cloud := cloudInterface.(*gce.GCECloud) + // TODO: Make this more resilient. Currently we create the cloud client + // and pass it through to all the pools. This makes unittesting easier. + // However if the cloud client suddenly fails, we should try to re-create it + // and continue. + cloud := getGCEClient() + cluster := ClusterManager{ClusterNamer: utils.Namer{name}} zone, err := cloud.GetZone() if err != nil { @@ -161,13 +203,16 @@ func NewClusterManager( } cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain) healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) + + // TODO: This needs to change to a consolidated management of the default backend. cluster.backendPool = backends.NewBackendPool( - cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer) + cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer) defaultBackendPool := backends.NewBackendPool( - cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer) + cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort cluster.l7Pool = loadbalancers.NewLoadBalancerPool( cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) + return &cluster, nil } diff --git a/ingress/controllers/gce/controller/controller.go b/ingress/controllers/gce/controller/controller.go index 34319f9ab5..9fadf9a331 100644 --- a/ingress/controllers/gce/controller/controller.go +++ b/ingress/controllers/gce/controller/controller.go @@ -265,6 +265,9 @@ func (lbc *LoadBalancerController) sync(key string) { glog.V(3).Infof("Finished syncing %v", key) }() + // Record any errors during sync and throw a single error at the end. This + // allows us to free up associated cloud resources ASAP. + var syncError error if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" @@ -276,29 +279,34 @@ func (lbc *LoadBalancerController) sync(key string) { } else { err = fmt.Errorf("%v Error: %v", eventMsg, err) } - lbc.ingQueue.requeue(key, err) - return + syncError = err } if !ingExists { + if syncError != nil { + lbc.ingQueue.requeue(key, err) + } return } // Update the UrlMap of the single loadbalancer that came through the watch. l7, err := lbc.CloudClusterManager.l7Pool.Get(key) if err != nil { - lbc.ingQueue.requeue(key, err) + lbc.ingQueue.requeue(key, fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)) return } ing := *obj.(*extensions.Ingress) if urlMap, err := lbc.tr.toUrlMap(&ing); err != nil { - lbc.ingQueue.requeue(key, err) + syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error()) - lbc.ingQueue.requeue(key, err) + syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) } else if lbc.updateIngressStatus(l7, ing); err != nil { lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error()) - lbc.ingQueue.requeue(key, err) + syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) + } + if syncError != nil { + lbc.ingQueue.requeue(key, syncError) } return } diff --git a/ingress/controllers/gce/controller/fakes.go b/ingress/controllers/gce/controller/fakes.go index ec3adee49f..9b036164b6 100644 --- a/ingress/controllers/gce/controller/fakes.go +++ b/ingress/controllers/gce/controller/fakes.go @@ -52,7 +52,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager { healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) backendPool := backends.NewBackendPool( fakeBackends, - healthChecker, nodePool, namer) + healthChecker, nodePool, namer, []int64{}, false) l7Pool := loadbalancers.NewLoadBalancerPool( fakeLbs, // TODO: change this diff --git a/ingress/controllers/gce/loadbalancers/loadbalancers.go b/ingress/controllers/gce/loadbalancers/loadbalancers.go index 3862a56c10..c3f3bd9477 100644 --- a/ingress/controllers/gce/loadbalancers/loadbalancers.go +++ b/ingress/controllers/gce/loadbalancers/loadbalancers.go @@ -174,7 +174,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { // The default backend is completely managed by the l7 pool. // This includes recreating it if it's deleted, or fixing broken links. - if err := l.defaultBackendPool.Sync([]int64{l.defaultBackendNodePort}); err != nil { + if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { return err } // create new loadbalancers, perform an edge hop for existing @@ -642,48 +642,29 @@ func (l *L7) UpdateUrlMap(ingressRules utils.GCEURLMap) error { } glog.V(3).Infof("Updating url map %+v", ingressRules) + // Every update replaces the entire urlmap. + // TODO: when we have multiple loadbalancers point to a single gce url map + // this needs modification. For now, there is a 1:1 mapping of urlmaps to + // Ingresses, so if the given Ingress doesn't have a host rule we should + // delete the path to that backend. + l.um.HostRules = []*compute.HostRule{} + l.um.PathMatchers = []*compute.PathMatcher{} + for hostname, urlToBackend := range ingressRules { - // Find the hostrule - // Find the path matcher + // Create a host rule + // Create a path matcher // Add all given endpoint:backends to pathRules in path matcher - var hostRule *compute.HostRule pmName := getNameForPathMatcher(hostname) - for _, hr := range l.um.HostRules { - // TODO: Hostnames must be exact match? - if hr.Hosts[0] == hostname { - hostRule = hr - break - } - } - if hostRule == nil { - // This is a new host - hostRule = &compute.HostRule{ - Hosts: []string{hostname}, - PathMatcher: pmName, - } - // Why not just clobber existing host rules? - // Because we can have multiple loadbalancers point to a single - // gce url map when we have IngressClaims. - l.um.HostRules = append(l.um.HostRules, hostRule) - } - var pathMatcher *compute.PathMatcher - for _, pm := range l.um.PathMatchers { - if pm.Name == hostRule.PathMatcher { - pathMatcher = pm - break - } - } - if pathMatcher == nil { - // This is a dangling or new host - pathMatcher = &compute.PathMatcher{Name: pmName} - l.um.PathMatchers = append(l.um.PathMatchers, pathMatcher) - } - pathMatcher.DefaultService = l.um.DefaultService + l.um.HostRules = append(l.um.HostRules, &compute.HostRule{ + Hosts: []string{hostname}, + PathMatcher: pmName, + }) - // TODO: Every update replaces the entire path map. This will need to - // change when we allow joining. Right now we call a single method - // to verify current == desired and add new url mappings. - pathMatcher.PathRules = []*compute.PathRule{} + pathMatcher := &compute.PathMatcher{ + Name: pmName, + DefaultService: l.um.DefaultService, + PathRules: []*compute.PathRule{}, + } // Longest prefix wins. For equal rules, first hit wins, i.e the second // /foo rule when the first is deleted. @@ -691,6 +672,7 @@ func (l *L7) UpdateUrlMap(ingressRules utils.GCEURLMap) error { pathMatcher.PathRules = append( pathMatcher.PathRules, &compute.PathRule{Paths: []string{expr}, Service: be.SelfLink}) } + l.um.PathMatchers = append(l.um.PathMatchers, pathMatcher) } um, err := l.cloud.UpdateUrlMap(l.um) if err != nil { diff --git a/ingress/controllers/gce/loadbalancers/loadbalancers_test.go b/ingress/controllers/gce/loadbalancers/loadbalancers_test.go index b5622fadf7..aa07b68b1c 100644 --- a/ingress/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/ingress/controllers/gce/loadbalancers/loadbalancers_test.go @@ -39,7 +39,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { namer := utils.Namer{} healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) backendPool := backends.NewBackendPool( - fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer) + fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false) return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer) } diff --git a/ingress/controllers/gce/main.go b/ingress/controllers/gce/main.go index ac84c8f17c..dd414f9ae2 100644 --- a/ingress/controllers/gce/main.go +++ b/ingress/controllers/gce/main.go @@ -58,7 +58,7 @@ const ( alphaNumericChar = "0" // Current docker image version. Only used in debug logging. - imageVersion = "glbc:0.6.0" + imageVersion = "glbc:0.6.1" ) var ( diff --git a/ingress/controllers/gce/rc.yaml b/ingress/controllers/gce/rc.yaml index f777c1e300..0c5b679403 100644 --- a/ingress/controllers/gce/rc.yaml +++ b/ingress/controllers/gce/rc.yaml @@ -24,18 +24,18 @@ metadata: name: l7-lb-controller labels: k8s-app: glbc - version: v0.6.0 + version: v0.6.1 spec: # There should never be more than 1 controller alive simultaneously. replicas: 1 selector: k8s-app: glbc - version: v0.6.0 + version: v0.6.1 template: metadata: labels: k8s-app: glbc - version: v0.6.0 + version: v0.6.1 name: glbc spec: terminationGracePeriodSeconds: 600 @@ -61,7 +61,7 @@ spec: requests: cpu: 10m memory: 20Mi - - image: gcr.io/google_containers/glbc:0.6.0 + - image: gcr.io/google_containers/glbc:0.6.1 livenessProbe: httpGet: path: /healthz diff --git a/ingress/controllers/gce/storage/pools.go b/ingress/controllers/gce/storage/pools.go index af8e6e63e2..3d1fd235e5 100644 --- a/ingress/controllers/gce/storage/pools.go +++ b/ingress/controllers/gce/storage/pools.go @@ -17,7 +17,12 @@ limitations under the License. package storage import ( + "sync" + "time" + + "github.com/golang/glog" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/util/wait" ) // Snapshotter is an interface capable of providing a consistent snapshot of @@ -51,3 +56,78 @@ func NewInMemoryPool() *InMemoryPool { return &InMemoryPool{ cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{})} } + +type keyFunc func(interface{}) (string, error) + +type cloudLister interface { + List() ([]interface{}, error) +} + +// CloudListingPool wraps InMemoryPool but relists from the cloud periodically. +type CloudListingPool struct { + // A lock to protect against concurrent mutation of the pool + lock sync.Mutex + // The pool that is re-populated via re-list from cloud, and written to + // from controller + *InMemoryPool + // An interface that lists objects from the cloud. + lister cloudLister + // A function capable of producing a key for a given object. + // This key must match the key used to store the same object in the user of + // this cache. + keyGetter keyFunc +} + +// ReplenishPool lists through the cloudLister and inserts into the pool. +func (c *CloudListingPool) ReplenishPool() { + c.lock.Lock() + defer c.lock.Unlock() + glog.V(4).Infof("Replenishing pool") + items, err := c.lister.List() + if err != nil { + glog.Warningf("Failed to list: %v", err) + return + } + for i := range items { + key, err := c.keyGetter(items[i]) + if err != nil { + glog.V(4).Infof("CloudListingPool: %v", err) + continue + } + c.InMemoryPool.Add(key, items[i]) + } +} + +// Snapshot just snapshots the underlying pool. +func (c *CloudListingPool) Snapshot() map[string]interface{} { + c.lock.Lock() + defer c.lock.Unlock() + return c.InMemoryPool.Snapshot() +} + +// Add simply adds to the underlying pool. +func (c *CloudListingPool) Add(key string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.InMemoryPool.Add(key, obj) +} + +// Delete just deletes from underlying pool. +func (c *CloudListingPool) Delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + c.InMemoryPool.Delete(key) +} + +// NewCloudListingPool replenishes the InMemoryPool through a background +// goroutine that lists from the given cloudLister. +func NewCloudListingPool(k keyFunc, lister cloudLister, relistPeriod time.Duration) *CloudListingPool { + cl := &CloudListingPool{ + InMemoryPool: NewInMemoryPool(), + lister: lister, + keyGetter: k, + } + glog.V(4).Infof("Starting pool replenish goroutine") + go wait.Until(cl.ReplenishPool, relistPeriod, make(chan struct{})) + return cl +} diff --git a/ingress/controllers/gce/utils/utils.go b/ingress/controllers/gce/utils/utils.go index fd93694fc3..a7dd117c76 100644 --- a/ingress/controllers/gce/utils/utils.go +++ b/ingress/controllers/gce/utils/utils.go @@ -18,10 +18,13 @@ package utils import ( "fmt" + "strconv" "strings" + "github.com/golang/glog" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + "regexp" ) const ( @@ -46,6 +49,7 @@ const ( // This allows sharing of backends across loadbalancers. backendPrefix = "k8s-be" + backendRegex = "k8s-be-([0-9]+).*" // Prefix used for instance groups involved in L7 balancing. igPrefix = "k8s-ig" @@ -93,11 +97,48 @@ func (n *Namer) decorateName(name string) string { return n.Truncate(fmt.Sprintf("%v%v%v", name, clusterNameDelimiter, n.ClusterName)) } +// NameBelongsToCluster checks if a given name is tagged with this cluster's UID. +func (n *Namer) NameBelongsToCluster(name string) bool { + if !strings.HasPrefix(name, "k8s-") { + glog.V(4).Infof("%v not part of cluster", name) + return false + } + parts := strings.Split(name, clusterNameDelimiter) + if len(parts) == 1 { + if n.ClusterName == "" { + return true + } + return false + } + if len(parts) > 2 { + glog.Warningf("Too many parts to name %v, ignoring", name) + return false + } + return parts[1] == n.ClusterName +} + // BeName constructs the name for a backend. func (n *Namer) BeName(port int64) string { return n.decorateName(fmt.Sprintf("%v-%d", backendPrefix, port)) } +// BePort retrieves the port from the given backend name. +func (n *Namer) BePort(beName string) (string, error) { + r, err := regexp.Compile(backendRegex) + if err != nil { + return "", err + } + match := r.FindStringSubmatch(beName) + if len(match) < 2 { + return "", fmt.Errorf("Unable to lookup port for %v", beName) + } + _, err = strconv.Atoi(match[1]) + if err != nil { + return "", fmt.Errorf("Unexpected regex match: %v", beName) + } + return match[1], nil +} + // IGName constructs the name for an Instance Group. func (n *Namer) IGName() string { // Currently all ports are added to a single instance group.