Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Resilience fixes for the Ingress controller #652

Merged
merged 5 commits into from
Apr 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ingress/controllers/gce/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
all: push

# 0.0 shouldn't clobber any released builds
TAG = 0.6.0
TAG = 0.6.1
PREFIX = gcr.io/google_containers/glbc

server:
Expand Down
57 changes: 51 additions & 6 deletions ingress/controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"

"k8s.io/kubernetes/pkg/util/sets"

Expand All @@ -38,6 +39,10 @@ type Backends struct {
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
namer utils.Namer
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
ignoredPorts sets.String
}

func portKey(port int64) string {
Expand All @@ -46,18 +51,50 @@ func portKey(port int64) string {

// NewBackendPool returns a new backend pool.
// - cloud: implements BackendServices and syncs backends with a cloud provider
// - healthChecker: is capable of producing health checks for backends.
// - nodePool: implements NodePool, used to create/delete new instance groups.
// - namer: procudes names for backends.
// - ignorePorts: is a set of ports to avoid syncing/GCing.
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewBackendPool(
cloud BackendServices,
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool, namer utils.Namer) *Backends {
return &Backends{
nodePool instances.NodePool,
namer utils.Namer,
ignorePorts []int64,
resyncWithCloud bool) *Backends {

ignored := []string{}
for _, p := range ignorePorts {
ignored = append(ignored, portKey(p))
}
backendPool := &Backends{
cloud: cloud,
nodePool: nodePool,
snapshotter: storage.NewInMemoryPool(),
healthChecker: healthChecker,
namer: namer,
ignoredPorts: sets.NewString(ignored...),
}
if !resyncWithCloud {
backendPool.snapshotter = storage.NewInMemoryPool()
return backendPool
}
backendPool.snapshotter = storage.NewCloudListingPool(
func(i interface{}) (string, error) {
bs := i.(*compute.BackendService)
if !namer.NameBelongsToCluster(bs.Name) {
return "", fmt.Errorf("Unrecognized name %v", bs.Name)
}
port, err := namer.BePort(bs.Name)
if err != nil {
return "", err
}
return port, nil
},
backendPool,
30*time.Second,
)
return backendPool
}

// Get returns a single backend.
Expand Down Expand Up @@ -150,10 +187,18 @@ func (b *Backends) Delete(port int64) (err error) {
}

// List lists all backends.
func (b *Backends) List() (*compute.BackendServiceList, error) {
func (b *Backends) List() ([]interface{}, error) {
// TODO: for consistency with the rest of this sub-package this method
// should return a list of backend ports.
return b.cloud.ListBackendServices()
interList := []interface{}{}
be, err := b.cloud.ListBackendServices()
if err != nil {
return interList, err
}
for i := range be.Items {
interList = append(interList, be.Items[i])
}
return interList, nil
}

// edgeHop checks the links of the given backend by executing an edge hop.
Expand Down Expand Up @@ -200,7 +245,7 @@ func (b *Backends) GC(svcNodePorts []int64) error {
return err
}
nodePort := int64(p)
if knownPorts.Has(portKey(nodePort)) {
if knownPorts.Has(portKey(nodePort)) || b.ignoredPorts.Has(portKey(nodePort)) {
continue
}
glog.V(3).Infof("GCing backend for port %v", p)
Expand Down
58 changes: 52 additions & 6 deletions ingress/controllers/gce/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,26 @@ package backends
import (
"testing"

compute "google.golang.org/api/compute/v1"
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
"k8s.io/contrib/ingress/controllers/gce/instances"
"k8s.io/contrib/ingress/controllers/gce/storage"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/util/sets"
)

func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups) BackendPool {
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
namer := utils.Namer{}
return NewBackendPool(
f,
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
instances.NewNodePool(fakeIGs, "default-zone"), namer)
instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud)
}

func TestBackendPoolAdd(t *testing.T) {
f := NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool := newBackendPool(f, fakeIGs)
pool := newBackendPool(f, fakeIGs, false)
namer := utils.Namer{}

// Add a backend for a port, then re-add the same port and
Expand Down Expand Up @@ -89,13 +91,12 @@ func TestBackendPoolAdd(t *testing.T) {
}

func TestBackendPoolSync(t *testing.T) {

// Call sync on a backend pool with a list of ports, make sure the pool
// creates/deletes required ports.
svcNodePorts := []int64{81, 82, 83}
f := NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool := newBackendPool(f, fakeIGs)
pool := newBackendPool(f, fakeIGs, true)
pool.Add(81)
pool.Add(90)
pool.Sync(svcNodePorts)
Expand All @@ -109,12 +110,57 @@ func TestBackendPoolSync(t *testing.T) {
}
}

svcNodePorts = []int64{81}
deletedPorts := []int64{82, 83}
pool.GC(svcNodePorts)
for _, port := range deletedPorts {
if _, err := pool.Get(port); err == nil {
t.Fatalf("Pool contains %v after deletion", port)
}
}

// All these backends should be ignored because they don't belong to the cluster.
// foo - non k8s managed backend
// k8s-be-foo - foo is not a nodeport
// k8s--bar--foo - too many cluster delimiters
// k8s-be-3001--uid - another cluster tagged with uid
unrelatedBackends := sets.NewString([]string{"foo", "k8s-be-foo", "k8s--bar--foo", "k8s-be-30001--uid"}...)
for _, name := range unrelatedBackends.List() {
f.CreateBackendService(&compute.BackendService{Name: name})
}

namer := &utils.Namer{}
// This backend should get deleted again since it is managed by this cluster.
f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])})

// TODO: Avoid casting.
// Repopulate the pool with a cloud list, which now includes the 82 port
// backend. This would happen if, say, an ingress backend is removed
// while the controller is restarting.
pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplenishPool()

pool.GC(svcNodePorts)

currBackends, _ := f.ListBackendServices()
currSet := sets.NewString()
for _, b := range currBackends.Items {
currSet.Insert(b.Name)
}
// Port 81 still exists because it's an in-use service NodePort.
knownBe := namer.BeName(81)
if !currSet.Has(knownBe) {
t.Fatalf("Expected %v to exist in backend pool", knownBe)
}
currSet.Delete(knownBe)
if !currSet.Equal(unrelatedBackends) {
t.Fatalf("Some unrelated backends were deleted. Expected %+v, got %+v", unrelatedBackends, currSet)
}
}

func TestBackendPoolShutdown(t *testing.T) {
f := NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool := newBackendPool(f, fakeIGs)
pool := newBackendPool(f, fakeIGs, false)
namer := utils.Namer{}

pool.Add(80)
Expand Down
2 changes: 1 addition & 1 deletion ingress/controllers/gce/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type BackendPool interface {
GC(ports []int64) error
Shutdown() error
Status(name string) string
List() (*compute.BackendServiceList, error)
List() ([]interface{}, error)
}

// BackendServices is an interface for managing gce backend services.
Expand Down
59 changes: 52 additions & 7 deletions ingress/controllers/gce/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controller

import (
"fmt"
"net/http"
"time"

"k8s.io/contrib/ingress/controllers/gce/backends"
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
Expand All @@ -26,6 +28,8 @@ import (
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/cloudprovider"
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

"github.com/golang/glog"
)

const (
Expand Down Expand Up @@ -54,6 +58,9 @@ const (

// Names longer than this are truncated, because of GCE restrictions.
nameLenLimit = 62

// Sleep interval to retry cloud client creation.
cloudClientRetryInterval = 10 * time.Second
)

// ClusterManager manages cluster resource pools.
Expand All @@ -70,6 +77,14 @@ func (c *ClusterManager) IsHealthy() (err error) {
// TODO: Expand on this, for now we just want to detect when the GCE client
// is broken.
_, err = c.backendPool.List()

// If this container is scheduled on a node without compute/rw it is
// effectively useless, but it is healthy. Reporting it as unhealthy
// will lead to container crashlooping.
if utils.IsHTTPErrorCode(err, http.StatusForbidden) {
glog.Infof("Reporting cluster as healthy, but unable to list backends: %v", err)
return nil
}
return
}

Expand Down Expand Up @@ -138,6 +153,32 @@ func defaultInstanceGroupName(clusterName string) string {
return fmt.Sprintf("%v-%v", instanceGroupPrefix, clusterName)
}

func getGCEClient() *gce.GCECloud {
// Creating the cloud interface involves resolving the metadata server to get
// an oauth token. If this fails, the token provider assumes it's not on GCE.
// No errors are thrown. So we need to keep retrying till it works because
// we know we're on GCE.
for {
cloudInterface, err := cloudprovider.GetCloudProvider("gce", nil)
if err == nil {
cloud := cloudInterface.(*gce.GCECloud)

// If this controller is scheduled on a node without compute/rw
// it won't be allowed to list backends. We can assume that the
// user has no need for Ingress in this case. If they grant
// permissions to the node they will have to restart the controller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FMOK, does user have to restart controller? why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I loop around here continuously re-creating the client till ListBackendServices succeedes, the liveness probe will fail at some point and cause a crashloop. I want to not crash if scheduled on a r/o node.

Another way to handle this is to keep re-creating the client object in the liveness probe, and if it suddenly starts succeeding ListbackendServices calls, somehow re-notify everyone I've given the old client to refresh their clients. Didn't think this level of effort was worth it, since we can just doc that the controller needs permissions and leave it at that.

// manually to re-create the client.
if _, err = cloud.ListBackendServices(); err == nil || utils.IsHTTPErrorCode(err, http.StatusForbidden) {
return cloud
}
glog.Warningf("Failed to list backend services, retrying: %v", err)
} else {
glog.Warningf("Failed to retrieve cloud interface, retrying: %v", err)
}
time.Sleep(cloudClientRetryInterval)
}
}

// NewClusterManager creates a cluster manager for shared resources.
// - name: is the name used to tag cluster wide shared resources. This is the
// string passed to glbc via --gce-cluster-name.
Expand All @@ -149,25 +190,29 @@ func NewClusterManager(
defaultBackendNodePort int64,
defaultHealthCheckPath string) (*ClusterManager, error) {

cloudInterface, err := cloudprovider.GetCloudProvider("gce", nil)
if err != nil {
return nil, err
}
cloud := cloudInterface.(*gce.GCECloud)
// TODO: Make this more resilient. Currently we create the cloud client
// and pass it through to all the pools. This makes unittesting easier.
// However if the cloud client suddenly fails, we should try to re-create it
// and continue.
cloud := getGCEClient()

cluster := ClusterManager{ClusterNamer: utils.Namer{name}}
zone, err := cloud.GetZone()
if err != nil {
return nil, err
}
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain)
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)

// TODO: This needs to change to a consolidated management of the default backend.
cluster.backendPool = backends.NewBackendPool(
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer)
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true)
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
defaultBackendPool := backends.NewBackendPool(
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer)
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(
cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)

return &cluster, nil
}
20 changes: 14 additions & 6 deletions ingress/controllers/gce/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ func (lbc *LoadBalancerController) sync(key string) {
glog.V(3).Infof("Finished syncing %v", key)
}()

// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
var syncError error
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
Expand All @@ -276,29 +279,34 @@ func (lbc *LoadBalancerController) sync(key string) {
} else {
err = fmt.Errorf("%v Error: %v", eventMsg, err)
}
lbc.ingQueue.requeue(key, err)
return
syncError = err
}

if !ingExists {
if syncError != nil {
lbc.ingQueue.requeue(key, err)
}
return
}
// Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil {
lbc.ingQueue.requeue(key, err)
lbc.ingQueue.requeue(key, fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err))
return
}

ing := *obj.(*extensions.Ingress)
if urlMap, err := lbc.tr.toUrlMap(&ing); err != nil {
lbc.ingQueue.requeue(key, err)
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error())
lbc.ingQueue.requeue(key, err)
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
lbc.ingQueue.requeue(key, err)
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
if syncError != nil {
lbc.ingQueue.requeue(key, syncError)
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion ingress/controllers/gce/controller/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
backendPool := backends.NewBackendPool(
fakeBackends,
healthChecker, nodePool, namer)
healthChecker, nodePool, namer, []int64{}, false)
l7Pool := loadbalancers.NewLoadBalancerPool(
fakeLbs,
// TODO: change this
Expand Down
Loading