diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 115ad52bcf6..7950c235799 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -27,6 +27,7 @@ import ( compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/ingress/controllers/gce/healthchecks" "k8s.io/ingress/controllers/gce/instances" @@ -75,6 +76,7 @@ type Backends struct { nodePool instances.NodePool healthChecker healthchecks.HealthChecker snapshotter storage.Snapshotter + prober probeProvider // ignoredPorts are a set of ports excluded from GC, even // after the Ingress has been deleted. Note that invoking // a Delete() on these ports will still delete the backend. @@ -86,6 +88,12 @@ func portKey(port int64) string { return fmt.Sprintf("%d", port) } +// ServicePort for tupling port and encryption bool +type ServicePort struct { + Port int64 + Encrypted bool +} + // NewBackendPool returns a new backend pool. // - cloud: implements BackendServices and syncs backends with a cloud provider // - healthChecker: is capable of producing health checks for backends. @@ -134,6 +142,10 @@ func NewBackendPool( return backendPool } +func (b *Backends) Init(pp probeProvider) { + b.prober = pp +} + // Get returns a single backend. func (b *Backends) Get(port int64) (*compute.BackendService, error) { be, err := b.cloud.GetBackendService(b.namer.BeName(port)) @@ -144,44 +156,46 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { return be, nil } -func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { - // Create a new health check - if err := b.healthChecker.Add(namedPort.Port); err != nil { - return nil, err +func (b *Backends) ensureHealthCheck(port int64, encrypted bool) (string, error) { + hc := b.healthChecker.New(port, encrypted) + if b.prober != nil { + probe, err := b.prober.GetProbe(port) + if err != nil { + return "", err + } + if probe != nil { + applyProbeSettingsToHC(probe, hc) + } + } + + if err := b.healthChecker.Sync(hc); err != nil { + return "", err } - hc, err := b.healthChecker.Get(namedPort.Port) + //TODO: Avoid this second call + hc, err := b.healthChecker.Get(port, encrypted) + if err != nil { + return "", err + } + return hc.SelfLink, nil +} + +func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, encrypted bool, name string) (*compute.BackendService, error) { + hclink, err := b.ensureHealthCheck(namedPort.Port, encrypted) if err != nil { return nil, err } - errs := []string{} + + var errs []string // We first try to create the backend with balancingMode=RATE. If this // fails, it's mostly likely because there are existing backends with - // balancingMode=UTILIZATION. This failure mode throws a googleapi error + // balancingMode=UTILIZATION. This failure mode throws a googleapi_v1 error // which wraps a HTTP 400 status code. We handle it in the loop below // and come around to retry with the right balancing mode. The goal is to // switch everyone to using RATE. for _, bm := range []BalancingMode{Rate, Utilization} { - backends := getBackendsForIGs(igs) - for _, b := range backends { - switch bm { - case Rate: - b.MaxRate = maxRPS - default: - // TODO: Set utilization and connection limits when we accept them - // as valid fields. - } - b.BalancingMode = string(bm) - } // Create a new backend - backend := &compute.BackendService{ - Name: name, - Protocol: "HTTP", - Backends: backends, - HealthChecks: []string{hc.SelfLink}, - Port: namedPort.Port, - PortName: namedPort.Name, - } - if err := b.cloud.CreateBackendService(backend); err != nil { + bs := newBackendService(igs, bm, namedPort, []string{hclink}, encrypted, name) + if err := b.cloud.CreateBackendService(bs); err != nil { // This is probably a failure because we tried to create the backend // with balancingMode=RATE when there are already backends with // balancingMode=UTILIZATION. Just ignore it and retry setting @@ -198,31 +212,84 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named return nil, fmt.Errorf("%v", strings.Join(errs, "\n")) } +func newBackendService(igs []*compute.InstanceGroup, bm BalancingMode, namedPort *compute.NamedPort, healthCheckLinks []string, encrypted bool, name string) *compute.BackendService { + backends := getBackendsForIGs(igs) + for _, b := range backends { + switch bm { + case Rate: + b.MaxRatePerInstance = maxRPS + default: + // TODO: Set utilization and connection limits when we accept them + // as valid fields. + } + b.BalancingMode = string(bm) + } + + return &compute.BackendService{ + Name: name, + Protocol: utils.GetHTTPScheme(encrypted), + Backends: backends, + HealthChecks: healthCheckLinks, + Port: namedPort.Port, + PortName: namedPort.Name, + } +} + +func (b *Backends) updateProtocol(bs *compute.BackendService, encrypted bool) (*compute.BackendService, error) { + // Create healthcheck with proper protocol + hclink, err := b.ensureHealthCheck(bs.Port, encrypted) + if err != nil { + return nil, err + } + + bs.Protocol = utils.GetHTTPScheme(encrypted) + bs.HealthChecks = []string{hclink} + + if err = b.cloud.UpdateBackendService(bs); err != nil { + return bs, err + } + + // Attempt delete of previous healthcheck; warn that err occurred + if err = b.healthChecker.Delete(bs.Port, !encrypted); err != nil { + glog.Warningf("Failed to delete %v healthcheck for port %v, err: %v", utils.GetHTTPScheme(!encrypted), bs.Port, err) + } + + return bs, nil +} + // Add will get or create a Backend for the given port. -func (b *Backends) Add(port int64) error { +func (b *Backends) Add(p ServicePort) error { // We must track the port even if creating the backend failed, because // we might've created a health-check for it. be := &compute.BackendService{} - defer func() { b.snapshotter.Add(portKey(port), be) }() + defer func() { b.snapshotter.Add(portKey(p.Port), be) }() - igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) + igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port) if err != nil { return err } - be, _ = b.Get(port) + + pName := b.namer.BeName(p.Port) + be, _ = b.Get(p.Port) if be == nil { - glog.Infof("Creating backend for %d instance groups, port %v named port %v", - len(igs), port, namedPort) - be, err = b.create(igs, namedPort, b.namer.BeName(port)) + glog.Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort) + be, err = b.create(igs, namedPort, p.Encrypted, pName) if err != nil { return err } } + + pProto := utils.GetHTTPScheme(p.Encrypted) + if be.Protocol != pProto { + glog.Infof("Updating backend protocol %v from %v to %v", pName, be.Protocol, pProto) + b.updateProtocol(be, p.Encrypted) + } + // we won't find any igs till the node pool syncs nodes. if len(igs) == 0 { return nil } - if err := b.edgeHop(be, igs); err != nil { + if err = b.edgeHop(be, igs); err != nil { return err } return err @@ -231,7 +298,7 @@ func (b *Backends) Add(port int64) error { // Delete deletes the Backend for the given port. func (b *Backends) Delete(port int64) (err error) { name := b.namer.BeName(port) - glog.Infof("Deleting backend %v", name) + glog.Infof("Deleting backend service %v", name) defer func() { if utils.IsHTTPErrorCode(err, http.StatusNotFound) { err = nil @@ -245,10 +312,17 @@ func (b *Backends) Delete(port int64) (err error) { !utils.IsHTTPErrorCode(err, http.StatusNotFound) { return err } - if err = b.healthChecker.Delete(port); err != nil && - !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - return err + + // Delete HTTP and HTTPS health checks in case both exist + // TODO: Update GLBC to create the newer compute.HealthCheck so we aren't straddling + // two types of health checks (legacy http & legacy https) + encryption := []bool{false, true} + for _, e := range encryption { + if err = b.healthChecker.Delete(port, e); err != nil { + return err + } } + return nil } @@ -306,7 +380,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr } // Sync syncs backend services corresponding to ports in the given list. -func (b *Backends) Sync(svcNodePorts []int64) error { +func (b *Backends) Sync(svcNodePorts []ServicePort) error { glog.V(3).Infof("Sync: backends %v", svcNodePorts) // create backends for new ports, perform an edge hop for existing ports @@ -319,14 +393,14 @@ func (b *Backends) Sync(svcNodePorts []int64) error { } // GC garbage collects services corresponding to ports in the given list. -func (b *Backends) GC(svcNodePorts []int64) error { +func (b *Backends) GC(svcNodePorts []ServicePort) error { knownPorts := sets.NewString() - for _, port := range svcNodePorts { - knownPorts.Insert(portKey(port)) + for _, p := range svcNodePorts { + knownPorts.Insert(portKey(p.Port)) } pool := b.snapshotter.Snapshot() for port := range pool { - p, err := strconv.Atoi(port) + p, err := strconv.ParseInt(port, 10, 64) if err != nil { return err } @@ -345,7 +419,7 @@ func (b *Backends) GC(svcNodePorts []int64) error { // Shutdown deletes all backends and the default backend. // This will fail if one of the backends is being used by another resource. func (b *Backends) Shutdown() error { - if err := b.GC([]int64{}); err != nil { + if err := b.GC([]ServicePort{}); err != nil { return err } return nil @@ -365,3 +439,25 @@ func (b *Backends) Status(name string) string { // TODO: State transition are important, not just the latest. return hs.HealthStatus[0].HealthState } + +func applyProbeSettingsToHC(p *api_v1.Probe, hc *healthchecks.HealthCheck) { + healthPath := p.Handler.HTTPGet.Path + // GCE requires a leading "/" for health check urls. + if !strings.HasPrefix(healthPath, "/") { + healthPath = fmt.Sprintf("/%v", healthPath) + } + + host := p.Handler.HTTPGet.Host + // remember the ingresses that use this Service so we can send + // the right events + + hc.RequestPath = healthPath + hc.Host = host + hc.Description = "kubernetes L7 health check from readiness probe." + // set a low health threshold and a high failure threshold. + // We're just trying to detect if the node networking is + // borked, service level outages will get detected sooner + // by kube-proxy. + hc.CheckIntervalSec = int64(p.PeriodSeconds + healthchecks.DefaultHealthCheckInterval) + hc.TimeoutSec = int64(p.TimeoutSeconds) +} diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 5766f6919cd..68f7ed00eeb 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -17,12 +17,15 @@ limitations under the License. package backends import ( + "fmt" "net/http" "testing" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/ingress/controllers/gce/healthchecks" "k8s.io/ingress/controllers/gce/instances" @@ -34,14 +37,28 @@ const defaultZone = "zone-a" var noOpErrFunc = func(op int, be *compute.BackendService) error { return nil } -func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { +var existingProbe = &api_v1.Probe{ + Handler: api_v1.Handler{ + HTTPGet: &api_v1.HTTPGetAction{ + Scheme: api_v1.URISchemeHTTP, + Path: "/my-special-path", + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, +} + +func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) *Backends { namer := &utils.Namer{} nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) - healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer) - healthChecks.Init(&healthchecks.FakeHealthCheckGetter{DefaultHealthCheck: nil}) - return NewBackendPool( - f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) + healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthCheckProvider(), "/", namer) + bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) + probes := map[int64]*api_v1.Probe{80: existingProbe} + bp.Init(NewFakeProbeProvider(probes)) + return bp } func TestBackendPoolAdd(t *testing.T) { @@ -50,39 +67,126 @@ func TestBackendPoolAdd(t *testing.T) { pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - // Add a backend for a port, then re-add the same port and - // make sure it corrects a broken link from the backend to - // the instance group. - nodePort := int64(8080) - pool.Add(nodePort) - beName := namer.BeName(nodePort) + testCases := []ServicePort{ + {80, false}, + {443, true}, + } + + for _, nodePort := range testCases { + t.Run(fmt.Sprintf("Port:%v Encrypted:%v", nodePort.Port, nodePort.Encrypted), func(t *testing.T) { + // Add a backend for a port, then re-add the same port and + // make sure it corrects a broken link from the backend to + // the instance group. + err := pool.Add(nodePort) + if err != nil { + t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err) + } + beName := namer.BeName(nodePort.Port) + + // Check that the new backend has the right port + be, err := f.GetBackendService(beName) + if err != nil { + t.Fatalf("Did not find expected backend %v", beName) + } + if be.Port != nodePort.Port { + t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) + } + + // Check that the instance group has the new port + var found bool + for _, port := range fakeIGs.Ports { + if port == nodePort.Port { + found = true + } + } + if !found { + t.Fatalf("Port %v not added to instance group", nodePort) + } + + // Check the created healthcheck is the correct protocol + // pool.healthChecker. + hc, err := pool.healthChecker.Get(nodePort.Port, nodePort.Encrypted) + if err != nil { + t.Fatalf("Unexpected err when querying fake healthchecker: %v", err) + } + + if hc.Encrypted != nodePort.Encrypted { + t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", utils.GetHTTPScheme(hc.Encrypted), utils.GetHTTPScheme(nodePort.Encrypted)) + } + }) + } +} + +func TestBackendPoolUpdate(t *testing.T) { + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + pool := newBackendPool(f, fakeIGs, false) + namer := utils.Namer{} + + p := ServicePort{Port: 3000, Encrypted: false} + pool.Add(p) + beName := namer.BeName(p.Port) - // Check that the new backend has the right port be, err := f.GetBackendService(beName) if err != nil { - t.Fatalf("Did not find expected backend %v", beName) + t.Fatalf("Unexpected err: %v", err) } - if be.Port != nodePort { - t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) + + if be.Protocol != utils.GetHTTPScheme(p.Encrypted) { + t.Fatalf("Expected scheme %v but got %v", utils.GetHTTPScheme(p.Encrypted), be.Protocol) } - // Check that the instance group has the new port - var found bool - for _, port := range fakeIGs.Ports { - if port == nodePort { - found = true - } + + // Assert the proper health check was created + hc, _ := pool.healthChecker.Get(p.Port, p.Encrypted) + if hc == nil || hc.Encrypted != p.Encrypted { + t.Fatalf("Expected %s health check, received %v: ", utils.GetHTTPScheme(p.Encrypted), hc) } - if !found { - t.Fatalf("Port %v not added to instance group", nodePort) + + // Update service port to encrypted + p.Encrypted = true + pool.Sync([]ServicePort{p}) + + be, err = f.GetBackendService(beName) + if err != nil { + t.Fatalf("Unexpected err retrieving backend service after update: %v", err) } + // Assert the backend has the correct protocol + if be.Protocol != utils.GetHTTPScheme(p.Encrypted) { + t.Fatalf("Expected scheme %v but got %v", utils.GetHTTPScheme(p.Encrypted), be.Protocol) + } + + // Assert the proper health check was created + hc, _ = pool.healthChecker.Get(p.Port, p.Encrypted) + if hc == nil || hc.Encrypted != p.Encrypted { + t.Fatalf("Expected %s health check, received %v: ", utils.GetHTTPScheme(p.Encrypted), hc) + } + + // Assert that the wrong health check does not exist + hc, _ = pool.healthChecker.Get(p.Port, !p.Encrypted) + if hc != nil { + t.Fatalf("Expected nil health check, received %v: ", hc) + } +} + +func TestBackendPoolChaosMonkey(t *testing.T) { + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + pool := newBackendPool(f, fakeIGs, false) + namer := utils.Namer{} + + nodePort := ServicePort{Port: 8080, Encrypted: false} + pool.Add(nodePort) + beName := namer.BeName(nodePort.Port) + + be, _ := f.GetBackendService(beName) + // Mess up the link between backend service and instance group. // This simulates a user doing foolish things through the UI. - f.calls = []int{} - be, err = f.GetBackendService(beName) be.Backends = []*compute.Backend{ {Group: "test edge hop"}, } + f.calls = []int{} f.UpdateBackendService(be) pool.Add(nodePort) @@ -114,28 +218,35 @@ func TestBackendPoolAdd(t *testing.T) { func TestBackendPoolSync(t *testing.T) { // Call sync on a backend pool with a list of ports, make sure the pool // creates/deletes required ports. - svcNodePorts := []int64{81, 82, 83} + svcNodePorts := []ServicePort{{Port: 81}, {Port: 82, Encrypted: true}, {Port: 83}} f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, true) - pool.Add(81) - pool.Add(90) - pool.Sync(svcNodePorts) - pool.GC(svcNodePorts) + pool.Add(ServicePort{Port: 81}) + pool.Add(ServicePort{Port: 90}) + if err := pool.Sync(svcNodePorts); err != nil { + t.Errorf("Expected backend pool to sync, err: %v", err) + } + if err := pool.GC(svcNodePorts); err != nil { + t.Errorf("Expected backend pool to GC, err: %v", err) + } if _, err := pool.Get(90); err == nil { t.Fatalf("Did not expect to find port 90") } for _, port := range svcNodePorts { - if _, err := pool.Get(port); err != nil { + if _, err := pool.Get(port.Port); err != nil { t.Fatalf("Expected to find port %v", port) } } - svcNodePorts = []int64{81} - deletedPorts := []int64{82, 83} - pool.GC(svcNodePorts) + svcNodePorts = []ServicePort{{Port: 81}} + deletedPorts := []ServicePort{{Port: 82}, {Port: 83}} + if err := pool.GC(svcNodePorts); err != nil { + t.Fatalf("Expected backend pool to GC, err: %v", err) + } + for _, port := range deletedPorts { - if _, err := pool.Get(port); err == nil { + if _, err := pool.Get(port.Port); err == nil { t.Fatalf("Pool contains %v after deletion", port) } } @@ -152,13 +263,13 @@ func TestBackendPoolSync(t *testing.T) { namer := &utils.Namer{} // This backend should get deleted again since it is managed by this cluster. - f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])}) + f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0].Port)}) // TODO: Avoid casting. // Repopulate the pool with a cloud list, which now includes the 82 port // backend. This would happen if, say, an ingress backend is removed // while the controller is restarting. - pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplenishPool() + pool.snapshotter.(*storage.CloudListingPool).ReplenishPool() pool.GC(svcNodePorts) @@ -184,7 +295,7 @@ func TestBackendPoolShutdown(t *testing.T) { pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - pool.Add(80) + pool.Add(ServicePort{Port: 80}) pool.Shutdown() if _, err := f.GetBackendService(namer.BeName(80)); err == nil { t.Fatalf("%v", err) @@ -198,7 +309,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { namer := utils.Namer{} // This will add the instance group k8s-ig to the instance pool - pool.Add(80) + pool.Add(ServicePort{Port: 80}) be, err := f.GetBackendService(namer.BeName(80)) if err != nil { @@ -211,12 +322,12 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { {Group: "k8s-ig-foo"}, } be.Backends = append(be.Backends, newGroups...) - if err := f.UpdateBackendService(be); err != nil { + if err = f.UpdateBackendService(be); err != nil { t.Fatalf("Failed to update backend service %v", be.Name) } // Make sure repeated adds don't clobber the inserted instance group - pool.Add(80) + pool.Add(ServicePort{Port: 80}) be, err = f.GetBackendService(namer.BeName(80)) if err != nil { t.Fatalf("%v", err) @@ -242,7 +353,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - nodePort := int64(8080) + nodePort := ServicePort{Port: 8080} modes := []BalancingMode{Rate, Utilization} // block the creation of Backends with the given balancingMode @@ -259,7 +370,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { } pool.Add(nodePort) - be, err := f.GetBackendService(namer.BeName(nodePort)) + be, err := f.GetBackendService(namer.BeName(nodePort.Port)) if err != nil { t.Fatalf("%v", err) } @@ -269,6 +380,32 @@ func TestBackendCreateBalancingMode(t *testing.T) { t.Fatalf("Wrong balancing mode, expected %v got %v", modes[(i+1)%len(modes)], b.BalancingMode) } } - pool.GC([]int64{}) + pool.GC([]ServicePort{}) + } +} + +func TestApplyProbeSettingsToHC(t *testing.T) { + p := "healthz" + hc := healthchecks.DefaultHealthCheckTemplate(8080, true) + probe := &api_v1.Probe{ + Handler: api_v1.Handler{ + HTTPGet: &api_v1.HTTPGetAction{ + Scheme: api_v1.URISchemeHTTP, + Path: p, + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + } + + applyProbeSettingsToHC(probe, hc) + + if hc.Encrypted != true || hc.Port != 8080 { + t.Errorf("Basic HC settings changed") + } + if hc.RequestPath != "/"+p { + t.Errorf("Failed to apply probe's requestpath") } } diff --git a/controllers/gce/backends/fakes.go b/controllers/gce/backends/fakes.go index 2fe73cf864c..13f02bbecfd 100644 --- a/controllers/gce/backends/fakes.go +++ b/controllers/gce/backends/fakes.go @@ -20,6 +20,7 @@ import ( "fmt" compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress/controllers/gce/utils" @@ -118,3 +119,20 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput return &compute.BackendServiceGroupHealth{ HealthStatus: states}, nil } + +// FakeProbeProvider implements the probeProvider interface for tests. +type FakeProbeProvider struct { + probes map[int64]*api_v1.Probe +} + +func NewFakeProbeProvider(probes map[int64]*api_v1.Probe) *FakeProbeProvider { + return &FakeProbeProvider{probes} +} + +// GetProbe returns the probe for a given nodePort +func (pp *FakeProbeProvider) GetProbe(port int64) (*api_v1.Probe, error) { + if probe, exists := pp.probes[port]; exists { + return probe, nil + } + return nil, nil +} diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 1e53368fc9a..5a5bed4de9e 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -18,16 +18,23 @@ package backends import ( compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/client-go/pkg/api/v1" ) +// ProbeProvider retrieves a probe struct given a nodePort +type probeProvider interface { + GetProbe(nodePort int64) (*api_v1.Probe, error) +} + // BackendPool is an interface to manage a pool of kubernetes nodePort services // as gce backendServices, and sync them through the BackendServices interface. type BackendPool interface { - Add(port int64) error + Init(p probeProvider) + Add(port ServicePort) error Get(port int64) (*compute.BackendService, error) Delete(port int64) error - Sync(ports []int64) error - GC(ports []int64) error + Sync(ports []ServicePort) error + GC(ports []ServicePort) error Shutdown() error Status(name string) string List() ([]interface{}, error) diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 79d21bc17f9..2f2a0836fc0 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -65,7 +65,7 @@ const ( // ClusterManager manages cluster resource pools. type ClusterManager struct { ClusterNamer *utils.Namer - defaultBackendNodePort int64 + defaultBackendNodePort backends.ServicePort instancePool instances.NodePool backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool @@ -83,9 +83,7 @@ type ClusterManager struct { // Init initializes the cluster manager. func (c *ClusterManager) Init(tr *GCETranslator) { c.instancePool.Init(tr) - for _, h := range c.healthCheckers { - h.Init(tr) - } + c.backendPool.Init(tr) // TODO: Initialize other members as needed. } @@ -126,17 +124,17 @@ func (c *ClusterManager) shutdown() error { // these ports must also be opened on the corresponding Instance Group. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []int64) error { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error { // Multiple ingress paths can point to the same service (and hence nodePort) // but each nodePort can only have one set of cloud resources behind it. So // don't waste time double validating GCE BackendServices. - portMap := map[int64]struct{}{} + portMap := map[int64]backends.ServicePort{} for _, p := range nodePorts { - portMap[p] = struct{}{} + portMap[p.Port] = p } - nodePorts = []int64{} - for p := range portMap { - nodePorts = append(nodePorts, p) + nodePorts = []backends.ServicePort{} + for _, sp := range portMap { + nodePorts = append(nodePorts, sp) } if err := c.backendPool.Sync(nodePorts); err != nil { return err @@ -158,7 +156,12 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // we shouldn't leak the firewall rule. fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort) } - if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { + + var np []int64 + for _, p := range fwNodePorts { + np = append(np, p.Port) + } + if err := c.firewallPool.Sync(np, nodeNames); err != nil { return err } @@ -171,7 +174,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // - nodePorts are the ports for which we want BackendServies. BackendServices // for ports not in this list are deleted. // This method ignores googleapi 404 errors (StatusNotFound). -func (c *ClusterManager) GC(lbNames []string, nodePorts []int64) error { +func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort) error { // On GC: // * Loadbalancers need to get deleted before backends. @@ -240,7 +243,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud { func NewClusterManager( configFilePath string, namer *utils.Namer, - defaultBackendNodePort int64, + defaultBackendNodePort backends.ServicePort, defaultHealthCheckPath string) (*ClusterManager, error) { // TODO: Make this more resilient. Currently we create the cloud client @@ -279,15 +282,12 @@ func NewClusterManager( cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker} // TODO: This needs to change to a consolidated management of the default backend. - cluster.backendPool = backends.NewBackendPool( - cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) - defaultBackendPool := backends.NewBackendPool( - cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) + cluster.backendPool = backends.NewBackendPool(cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.Port}, true) + defaultBackendPool := backends.NewBackendPool(cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. - cluster.l7Pool = loadbalancers.NewLoadBalancerPool( - cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) + cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer) return &cluster, nil } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index dc8a80f586d..63de62754df 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -29,8 +29,8 @@ import ( "k8s.io/ingress/controllers/gce/utils" ) -const ( - testDefaultBeNodePort = int64(3000) +var ( + testDefaultBeNodePort = backends.ServicePort{Port: 3000, Encrypted: false} ) var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} @@ -48,14 +48,13 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName) fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - fakeHCs := healthchecks.NewFakeHealthChecks() + fakeHCP := healthchecks.NewFakeHealthCheckProvider() namer := utils.NewNamer(clusterName, firewallName) nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) - healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) - healthChecker.Init(&healthchecks.FakeHealthCheckGetter{}) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) backendPool := backends.NewBackendPool( fakeBackends, diff --git a/controllers/gce/controller/util_test.go b/controllers/gce/controller/util_test.go index 774750f55b3..a1775c08f43 100644 --- a/controllers/gce/controller/util_test.go +++ b/controllers/gce/controller/util_test.go @@ -100,11 +100,11 @@ func TestProbeGetter(t *testing.T) { } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getPathInProbe(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getPathInProbe(got), exp) } } } @@ -122,11 +122,11 @@ func TestProbeGetterNamedPort(t *testing.T) { pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} } for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getPathInProbe(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getPathInProbe(got), exp) } } @@ -173,11 +173,11 @@ func TestProbeGetterCrossNamespace(t *testing.T) { addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getPathInProbe(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getPathInProbe(got), exp) } } } @@ -257,3 +257,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { } lbc.CloudClusterManager.instancePool.Init(lbc.tr) } + +func getPathInProbe(p *api_v1.Probe) string { + return p.Handler.HTTPGet.Path +} diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 34b7eb10c03..119627f71cf 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "encoding/json" "fmt" "sort" "strconv" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/loadbalancers" "k8s.io/ingress/controllers/gce/utils" ) @@ -63,6 +65,9 @@ const ( // to the target proxies of the Ingress. preSharedCertKey = "ingress.gcp.kubernetes.io/pre-shared-cert" + // serviceReencryptKey + serviceReencryptKey = "service.alpha.kubernetes.io/tls-ports" + // ingressClassKey picks a specific "class" for the Ingress. The controller // only processes Ingresses with this annotation either unset, or set // to either gceIngessClass or the empty string. @@ -116,6 +121,29 @@ func (ing ingAnnotations) ingressClass() string { return val } +// svcAnnotations represents Service annotations. +type svcAnnotations map[string]string + +func (svc svcAnnotations) TLSPorts() (map[string]struct{}, error) { + val, ok := svc[serviceReencryptKey] + if !ok { + return map[string]struct{}{}, nil + } + + var ports []string + err := json.Unmarshal([]byte(val), &ports) + if err != nil { + return nil, err + } + + portMap := map[string]struct{}{} + for _, p := range ports { + portMap[p] = struct{}{} + } + + return portMap, err +} + // isGCEIngress returns true if the given Ingress either doesn't specify the // ingress.class annotation, or it's set to "gce". func isGCEIngress(ing *extensions.Ingress) bool { @@ -134,6 +162,15 @@ func (e errorNodePortNotFound) Error() string { e.backend, e.origErr) } +type errorSvcTLSPortsParsing struct { + svc *api_v1.Service + origErr error +} + +func (e errorSvcTLSPortsParsing) Error() string { + return fmt.Sprintf("could not parse %v annotation on Service %v/%v, err: %v", serviceReencryptKey, e.svc.Namespace, e.svc.Name, e.origErr) +} + // taskQueue manages a work queue through an independent worker that // invokes the given sync function for every work item inserted. type taskQueue struct { @@ -362,17 +399,17 @@ func (t *GCETranslator) toGCEBackend(be *extensions.IngressBackend, ns string) ( if err != nil { return nil, err } - backend, err := t.CloudClusterManager.backendPool.Get(int64(port)) + backend, err := t.CloudClusterManager.backendPool.Get(port.Port) if err != nil { - return nil, fmt.Errorf( - "no GCE backend exists for port %v, kube backend %+v", port, be) + return nil, fmt.Errorf("no GCE backend exists for port %v, kube backend %+v", port, be) } return backend, nil } // getServiceNodePort looks in the svc store for a matching service:port, // and returns the nodeport. -func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (int, error) { +func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (backends.ServicePort, error) { + noPort := backends.ServicePort{} obj, exists, err := t.svcLister.Indexer.Get( &api_v1.Service{ ObjectMeta: meta_v1.ObjectMeta{ @@ -381,37 +418,47 @@ func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespa }, }) if !exists { - return invalidPort, errorNodePortNotFound{be, fmt.Errorf( - "service %v/%v not found in store", namespace, be.ServiceName)} + return noPort, errorNodePortNotFound{be, fmt.Errorf("service %v/%v not found in store", namespace, be.ServiceName)} + } + if err != nil { + return noPort, errorNodePortNotFound{be, err} } + svc := obj.(*api_v1.Service) + tlsPorts, err := svcAnnotations(svc.GetAnnotations()).TLSPorts() if err != nil { - return invalidPort, errorNodePortNotFound{be, err} + return noPort, errorSvcTLSPortsParsing{svc, err} } - var nodePort int - for _, p := range obj.(*api_v1.Service).Spec.Ports { + + var port *api_v1.ServicePort +PortLoop: + for _, p := range svc.Spec.Ports { + np := p switch be.ServicePort.Type { case intstr.Int: if p.Port == be.ServicePort.IntVal { - nodePort = int(p.NodePort) - break + port = &np + break PortLoop } default: if p.Name == be.ServicePort.StrVal { - nodePort = int(p.NodePort) - break + port = &np + break PortLoop } } } - if nodePort != invalidPort { - return nodePort, nil + + if port == nil { + return noPort, errorNodePortNotFound{be, fmt.Errorf("could not find matching nodeport from service")} } - return invalidPort, errorNodePortNotFound{be, fmt.Errorf( - "could not find matching nodeport from service")} + + _, isEncrypted := tlsPorts[port.Name] + p := backends.ServicePort{Port: int64(port.NodePort), Encrypted: isEncrypted} + return p, nil } // toNodePorts converts a pathlist to a flat list of nodeports. -func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { - knownPorts := []int64{} +func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort { + var knownPorts []backends.ServicePort for _, ing := range ings.Items { defaultBackend := ing.Spec.Backend if defaultBackend != nil { @@ -419,7 +466,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { if err != nil { glog.Infof("%v", err) } else { - knownPorts = append(knownPorts, int64(port)) + knownPorts = append(knownPorts, port) } } for _, rule := range ing.Spec.Rules { @@ -433,7 +480,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { glog.Infof("%v", err) continue } - knownPorts = append(knownPorts, int64(port)) + knownPorts = append(knownPorts, port) } } } @@ -536,73 +583,33 @@ func isSimpleHTTPProbe(probe *api_v1.Probe) bool { probe.Handler.HTTPGet.Scheme == api_v1.URISchemeHTTP && len(probe.Handler.HTTPGet.HTTPHeaders) == 0) } -// HealthCheck returns the http readiness probe for the endpoint backing the -// given nodePort. If no probe is found it returns a health check with "" as -// the request path, callers are responsible for swapping this out for the -// appropriate default. -func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { +// GetProbe returns a probe that's used for the given nodeport +func (t *GCETranslator) GetProbe(port int64) (*api_v1.Probe, error) { sl := t.svcLister.List() - var ingresses []extensions.Ingress - var healthCheck *compute.HttpHealthCheck + // Find the label and target port of the one service with the given nodePort + var service api_v1.Service + var svcPort api_v1.ServicePort + var found bool +OuterLoop: for _, as := range sl { - s := as.(*api_v1.Service) - for _, p := range s.Spec.Ports { - + service = *as.(*api_v1.Service) + for _, sp := range service.Spec.Ports { + svcPort = sp // only one Service can match this nodePort, try and look up // the readiness probe of the pods behind it - if int32(port) != p.NodePort { - continue - } - rp, err := t.getHTTPProbe(*s, p.TargetPort) - if err != nil { - return nil, err + if int32(port) == sp.NodePort { + found = true + break OuterLoop } - if rp == nil { - glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port) - break - } - - healthPath := rp.Handler.HTTPGet.Path - // GCE requires a leading "/" for health check urls. - if string(healthPath[0]) != "/" { - healthPath = fmt.Sprintf("/%v", healthPath) - } - - host := rp.Handler.HTTPGet.Host - glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath) - // remember the ingresses that use this Service so we can send - // the right events - ingresses, err = t.ingLister.GetServiceIngress(s) - if err != nil { - glog.Warningf("Failed to list ingresses for service %v", s.Name) - } - - healthCheck = &compute.HttpHealthCheck{ - Port: port, - RequestPath: healthPath, - Host: host, - Description: "kubernetes L7 health check from readiness probe.", - // set a low health threshold and a high failure threshold. - // We're just trying to detect if the node networking is - // borked, service level outages will get detected sooner - // by kube-proxy. - CheckIntervalSec: int64(rp.PeriodSeconds + utils.DefaultHealthCheckInterval), - TimeoutSec: int64(rp.TimeoutSeconds), - HealthyThreshold: utils.DefaultHealthyThreshold, - UnhealthyThreshold: utils.DefaultUnhealthyThreshold, - // TODO: include headers after updating compute godep. - } - break } } - if healthCheck == nil { - healthCheck = utils.DefaultHealthCheckTemplate(port) - } - for _, ing := range ingresses { - t.recorder.Eventf(&ing, api_v1.EventTypeNormal, "GCE", fmt.Sprintf("health check using %v:%v%v", healthCheck.Host, healthCheck.Port, healthCheck.RequestPath)) + + if !found { + return nil, fmt.Errorf("unable to find nodeport %v in any service", port) } - return healthCheck, nil + + return t.getHTTPProbe(service, svcPort.TargetPort) } // PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker. diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 07240ccefc0..757851c57ff 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -17,86 +17,94 @@ limitations under the License. package healthchecks import ( - "fmt" - compute "google.golang.org/api/compute/v1" - - "k8s.io/ingress/controllers/gce/utils" + "google.golang.org/api/googleapi" ) -// NewFakeHealthChecks returns a new FakeHealthChecks. -func NewFakeHealthChecks() *FakeHealthChecks { - return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} -} - -// FakeHealthCheckGetter implements the healthCheckGetter interface for tests. -type FakeHealthCheckGetter struct { - DefaultHealthCheck *compute.HttpHealthCheck +func fakeNotFoundErr() *googleapi.Error { + return &googleapi.Error{Code: 404} } -// HealthCheck returns the health check for the given port. If a health check -// isn't stored under the DefaultHealthCheck member, it constructs one. -func (h *FakeHealthCheckGetter) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { - if h.DefaultHealthCheck == nil { - return utils.DefaultHealthCheckTemplate(port), nil +// NewFakeHealthCheckProvider returns a new FakeHealthChecks. +func NewFakeHealthCheckProvider() *FakeHealthCheckProvider { + return &FakeHealthCheckProvider{ + http: make(map[string]compute.HttpHealthCheck), + https: make(map[string]compute.HttpsHealthCheck), } - return h.DefaultHealthCheck, nil } -// FakeHealthChecks fakes out health checks. -type FakeHealthChecks struct { - hc []*compute.HttpHealthCheck +// FakeHealthCheckProvider fakes out health checks. +type FakeHealthCheckProvider struct { + http map[string]compute.HttpHealthCheck + https map[string]compute.HttpsHealthCheck } // CreateHttpHealthCheck fakes out http health check creation. -func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - f.hc = append(f.hc, hc) +func (f *FakeHealthCheckProvider) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + f.http[hc.Name] = *hc return nil } // GetHttpHealthCheck fakes out getting a http health check from the cloud. -func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { - for _, h := range f.hc { - if h.Name == name { - return h, nil - } +func (f *FakeHealthCheckProvider) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { + if hc, found := f.http[name]; found { + return &hc, nil } - return nil, fmt.Errorf("health check %v not found", name) + + return nil, fakeNotFoundErr() } // DeleteHttpHealthCheck fakes out deleting a http health check. -func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { - healthChecks := []*compute.HttpHealthCheck{} - exists := false - for _, h := range f.hc { - if h.Name == name { - exists = true - continue - } - healthChecks = append(healthChecks, h) - } - if !exists { - return fmt.Errorf("failed to find health check %v", name) +func (f *FakeHealthCheckProvider) DeleteHttpHealthCheck(name string) error { + if _, exists := f.http[name]; !exists { + return fakeNotFoundErr() } - f.hc = healthChecks + + delete(f.http, name) return nil } // UpdateHttpHealthCheck sends the given health check as an update. -func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - healthChecks := []*compute.HttpHealthCheck{} - found := false - for _, h := range f.hc { - if h.Name == hc.Name { - healthChecks = append(healthChecks, hc) - found = true - } else { - healthChecks = append(healthChecks, h) - } +func (f *FakeHealthCheckProvider) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + if _, exists := f.http[hc.Name]; !exists { + return fakeNotFoundErr() + } + + f.http[hc.Name] = *hc + return nil +} + +// CreateHttpsHealthCheck fakes out http health check creation. +func (f *FakeHealthCheckProvider) CreateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error { + f.https[hc.Name] = *hc + return nil +} + +// GetHttpsHealthCheck fakes out getting a http health check from the cloud. +func (f *FakeHealthCheckProvider) GetHttpsHealthCheck(name string) (*compute.HttpsHealthCheck, error) { + if hc, found := f.https[name]; found { + return &hc, nil + } + + return nil, fakeNotFoundErr() +} + +// DeleteHttpsHealthCheck fakes out deleting a http health check. +func (f *FakeHealthCheckProvider) DeleteHttpsHealthCheck(name string) error { + if _, exists := f.https[name]; !exists { + return fakeNotFoundErr() } - if !found { - return fmt.Errorf("cannot update a non-existent health check %v", hc.Name) + + delete(f.https, name) + return nil +} + +// UpdateHttpsHealthCheck sends the given health check as an update. +func (f *FakeHealthCheckProvider) UpdateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error { + if _, exists := f.https[hc.Name]; !exists { + return fakeNotFoundErr() } - f.hc = healthChecks + + f.https[hc.Name] = *hc return nil } diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index 6b37392ab94..2565a0f3ae7 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -19,76 +19,162 @@ package healthchecks import ( "net/http" - "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + "github.com/golang/glog" + "k8s.io/ingress/controllers/gce/utils" ) +const ( + // DefaultHealthCheckInterval defines how frequently a probe runs + DefaultHealthCheckInterval = 60 + // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" + DefaultHealthyThreshold = 1 + // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" + DefaultUnhealthyThreshold = 10 + // DefaultTimeoutSeconds defines the timeout of each probe + DefaultTimeoutSeconds = 60 +) + // HealthChecks manages health checks. type HealthChecks struct { - cloud SingleHealthCheck + cloud HealthCheckProvider defaultPath string namer *utils.Namer - healthCheckGetter } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. -func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { - return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil} +func NewHealthChecker(cloud HealthCheckProvider, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { + return &HealthChecks{cloud, defaultHealthCheckPath, namer} } -// Init initializes the health checker. -func (h *HealthChecks) Init(r healthCheckGetter) { - h.healthCheckGetter = r +func (h *HealthChecks) New(port int64, encrypted bool) *HealthCheck { + hc := DefaultHealthCheckTemplate(port, encrypted) + hc.Name = h.namer.BeName(port) + return hc } -// Add adds a healthcheck if one for the same port doesn't already exist. -func (h *HealthChecks) Add(port int64) error { - wantHC, err := h.healthCheckGetter.HealthCheck(port) - if err != nil { - return err +// Sync retrieves a health check based on port/encryption, checks for properties +// that should be identical, and updates/creates if necessary. +// Sync is only called by the backends.Add func - it's not a pool like other resources. +func (h *HealthChecks) Sync(hc *HealthCheck) error { + // Verify default path + if hc.RequestPath == "" { + hc.RequestPath = h.defaultPath } - if wantHC.RequestPath == "" { - wantHC.RequestPath = h.defaultPath + + existingHC, err := h.Get(hc.Port, hc.Encrypted) + if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + return err } - name := h.namer.BeName(port) - wantHC.Name = name - hc, _ := h.Get(port) - if hc == nil { - // TODO: check if the readiness probe has changed and update the - // health check. - glog.Infof("Creating health check %v", name) - if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil { - return err + if existingHC == nil { + glog.Infof("Creating %v health check %v", utils.GetHTTPScheme(hc.Encrypted), hc.Name) + if hc.Encrypted { + return h.cloud.CreateHttpsHealthCheck(hc.ToHttpsHealthCheck()) } - } else if wantHC.RequestPath != hc.RequestPath { + return h.cloud.CreateHttpHealthCheck(hc.ToHttpHealthCheck()) + } + if existingHC != nil && existingHC.RequestPath != hc.RequestPath { // TODO: reconcile health checks, and compare headers interval etc. // Currently Ingress doesn't expose all the health check params // natively, so some users prefer to hand modify the check. glog.Infof("Unexpected request path on health check %v, has %v want %v, NOT reconciling", - name, hc.RequestPath, wantHC.RequestPath) + hc.Name, existingHC.RequestPath, hc.RequestPath) } else { glog.Infof("Health check %v already exists and has the expected path %v", hc.Name, hc.RequestPath) } + return nil } // Delete deletes the health check by port. -func (h *HealthChecks) Delete(port int64) error { +func (h *HealthChecks) Delete(port int64, encrypted bool) error { + scheme := utils.GetHTTPScheme(encrypted) name := h.namer.BeName(port) - glog.Infof("Deleting health check %v", name) - if err := h.cloud.DeleteHttpHealthCheck(h.namer.BeName(port)); err != nil { - if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - return err + glog.Infof("Deleting %v health check %v", scheme, name) + if encrypted { + return h.cloud.DeleteHttpsHealthCheck(h.namer.BeName(port)) + } + return h.cloud.DeleteHttpHealthCheck(h.namer.BeName(port)) +} + +func (h *HealthChecks) Get(port int64, encrypted bool) (*HealthCheck, error) { + name := h.namer.BeName(port) + if encrypted { + hc, err := h.cloud.GetHttpsHealthCheck(name) + if err != nil { + return nil, err } + return NewHealthCheckHttps(hc), nil } - return nil + hc, err := h.cloud.GetHttpHealthCheck(name) + if err != nil { + return nil, err + } + return NewHealthCheckHttp(hc), nil +} + +// DefaultHealthCheckTemplate simply returns the default health check template. +func DefaultHealthCheckTemplate(port int64, encrypted bool) *HealthCheck { + return &HealthCheck{ + HttpHealthCheck: compute.HttpHealthCheck{ + Port: port, + // Empty string is used as a signal to the caller to use the appropriate + // default. + RequestPath: "", + Description: "Default kubernetes L7 Loadbalancing health check.", + // How often to health check. + CheckIntervalSec: DefaultHealthCheckInterval, + // How long to wait before claiming failure of a health check. + TimeoutSec: DefaultTimeoutSeconds, + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: DefaultHealthyThreshold, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: DefaultUnhealthyThreshold, + }, + Encrypted: encrypted, + } +} + +type HealthCheck struct { + compute.HttpHealthCheck + Encrypted bool +} + +// NewHealthCheckHttp +func NewHealthCheckHttp(hc *compute.HttpHealthCheck) *HealthCheck { + if hc == nil { + return nil + } + + return &HealthCheck{ + HttpHealthCheck: *hc, + Encrypted: false, + } +} + +// NewHealthCheckHttps +func NewHealthCheckHttps(hc *compute.HttpsHealthCheck) *HealthCheck { + if hc == nil { + return nil + } + h := *hc + return &HealthCheck{ + HttpHealthCheck: compute.HttpHealthCheck(h), + Encrypted: true, + } +} + +// ToHttpHealthCheck should only be called if Encrypted=false +func (hc *HealthCheck) ToHttpHealthCheck() *compute.HttpHealthCheck { + return &hc.HttpHealthCheck } -// Get returns the given health check. -func (h *HealthChecks) Get(port int64) (*compute.HttpHealthCheck, error) { - return h.cloud.GetHttpHealthCheck(h.namer.BeName(port)) +// ToHttpsHealthCheck should only be called if Encrypted=true +func (hc *HealthCheck) ToHttpsHealthCheck() *compute.HttpsHealthCheck { + ehc := compute.HttpsHealthCheck(hc.HttpHealthCheck) + return &ehc } diff --git a/controllers/gce/healthchecks/healthchecks_test.go b/controllers/gce/healthchecks/healthchecks_test.go index 9db1edd4965..cbeb1ced7f1 100644 --- a/controllers/gce/healthchecks/healthchecks_test.go +++ b/controllers/gce/healthchecks/healthchecks_test.go @@ -17,47 +17,149 @@ limitations under the License. package healthchecks import ( + "net/http" "testing" "k8s.io/ingress/controllers/gce/utils" ) -func TestFakeHealthCheckActions(t *testing.T) { +func TestHealthCheckAdd(t *testing.T) { namer := &utils.Namer{} - healthChecks := NewHealthChecker(NewFakeHealthChecks(), "/", namer) - healthChecks.Init(&FakeHealthCheckGetter{DefaultHealthCheck: nil}) + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) - err := healthChecks.Add(80) + hc := healthChecks.New(80, false) + err := healthChecks.Sync(hc) if err != nil { - t.Fatalf("unexpected error") + t.Fatalf("unexpected error: %v", err) } + // Verify the health check exists + _, err = hcp.GetHttpHealthCheck(namer.BeName(80)) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + hc = healthChecks.New(443, true) + err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHttpsHealthCheck(namer.BeName(443)) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } +} + +func TestHealthCheckAddExisting(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // HTTP + // Manually insert a health check + httpHC := DefaultHealthCheckTemplate(3000, false) + httpHC.Name = namer.BeName(3000) + httpHC.RequestPath = "/my-probes-health" + hcp.CreateHttpHealthCheck(httpHC.ToHttpHealthCheck()) + + // Should not fail adding the same type of health check + hc := healthChecks.New(3000, false) + err := healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHttpHealthCheck(httpHC.Name) + if err != nil { + t.Fatalf("expected the health check to continue existing, err: %v", err) + } + + // HTTPS + // Manually insert a health check + httpsHC := DefaultHealthCheckTemplate(4000, true) + httpsHC.Name = namer.BeName(4000) + httpsHC.RequestPath = "/my-probes-health" + hcp.CreateHttpsHealthCheck(httpsHC.ToHttpsHealthCheck()) + + hc = healthChecks.New(4000, true) + err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHttpsHealthCheck(httpsHC.Name) + if err != nil { + t.Fatalf("expected the health check to continue existing, err: %v", err) + } +} + +func TestHealthCheckDelete(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // Create HTTP HC for 1234 + hc := DefaultHealthCheckTemplate(1234, false) + hc.Name = namer.BeName(1234) + hcp.CreateHttpHealthCheck(hc.ToHttpHealthCheck()) + + // Create HTTPS HC for 1234) + hcp.CreateHttpsHealthCheck(hc.ToHttpsHealthCheck()) - _, err1 := healthChecks.Get(8080) - if err1 == nil { - t.Errorf("expected error") + // Delete only HTTP 1234 + err := healthChecks.Delete(1234, false) + if err != nil { + t.Errorf("unexpected error when deleting HTTP health check, err: %v", err) } - hc, err2 := healthChecks.Get(80) - if err2 != nil { - t.Errorf("unexpected error") - } else { - if hc == nil { - t.Errorf("expected a *compute.HttpHealthCheck") - } + // Validate port is deleted + _, err = hcp.GetHttpHealthCheck(hc.Name) + if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + t.Errorf("expected not-found error, actual: %v", err) } - err = healthChecks.Delete(8080) - if err == nil { - t.Errorf("expected error") + // Validate the wrong port wasn't deleted + _, err = hcp.GetHttpsHealthCheck(hc.Name) + if err != nil { + t.Errorf("unexpected error retrieving known healthcheck: %v", err) } - err = healthChecks.Delete(80) + // Delete only HTTPS 1234 + err = healthChecks.Delete(1234, true) if err != nil { - t.Errorf("unexpected error") + t.Errorf("unexpected error when deleting HTTPS health check, err: %v", err) } +} + +func TestHealthCheckGet(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) - _, err3 := healthChecks.Get(80) - if err3 == nil { - t.Errorf("expected error") + // HTTP + // Manually insert a health check + httpHC := DefaultHealthCheckTemplate(3000, false) + httpHC.Name = namer.BeName(3000) + httpHC.RequestPath = "/my-probes-health" + hcp.CreateHttpHealthCheck(httpHC.ToHttpHealthCheck()) + + // Verify the health check exists + _, err := healthChecks.Get(3000, false) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + // HTTPS + // Manually insert a health check + httpsHC := DefaultHealthCheckTemplate(4000, true) + httpsHC.Name = namer.BeName(4000) + httpsHC.RequestPath = "/my-probes-health" + hcp.CreateHttpsHealthCheck(httpsHC.ToHttpsHealthCheck()) + + // Verify the health check exists + _, err = healthChecks.Get(4000, true) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) } } diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index 59d25969648..1aed56a623c 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -16,29 +16,25 @@ limitations under the License. package healthchecks -import ( - compute "google.golang.org/api/compute/v1" -) - -// healthCheckGetter retrieves health checks. -type healthCheckGetter interface { - // HealthCheck returns the HTTP readiness check for a node port. - HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) -} +import compute "google.golang.org/api/compute/v1" -// SingleHealthCheck is an interface to manage a single GCE health check. -type SingleHealthCheck interface { +// HealthCheckProvider is an interface to manage a single GCE health check. +type HealthCheckProvider interface { CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error DeleteHttpHealthCheck(name string) error GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) + + CreateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error + UpdateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error + DeleteHttpsHealthCheck(name string) error + GetHttpsHealthCheck(name string) (*compute.HttpsHealthCheck, error) } // HealthChecker is an interface to manage cloud HTTPHealthChecks. type HealthChecker interface { - Init(h healthCheckGetter) - - Add(port int64) error - Delete(port int64) error - Get(port int64) (*compute.HttpHealthCheck, error) + New(port int64, encrypted bool) *HealthCheck + Sync(hc *HealthCheck) error + Delete(port int64, encrypted bool) error + Get(port int64, encrypted bool) (*HealthCheck, error) } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index dd5fd1574bb..9972ccc013a 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -70,7 +70,7 @@ type L7s struct { // TODO: Remove this field and always ask the BackendPool using the NodePort. glbcDefaultBackend *compute.BackendService defaultBackendPool backends.BackendPool - defaultBackendNodePort int64 + defaultBackendNodePort backends.ServicePort namer *utils.Namer } @@ -84,7 +84,7 @@ type L7s struct { func NewLoadBalancerPool( cloud LoadBalancers, defaultBackendPool backends.BackendPool, - defaultBackendNodePort int64, namer *utils.Namer) LoadBalancerPool { + defaultBackendNodePort backends.ServicePort, namer *utils.Namer) LoadBalancerPool { return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer} } @@ -172,7 +172,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { return err } - defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort) + defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) if err != nil { return err } @@ -209,7 +209,7 @@ func (l *L7s) GC(names []string) error { // This needs to happen after we've deleted all url-maps that might be // using it. if len(names) == 0 { - if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort); err != nil { + if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort.Port); err != nil { return err } l.glbcDefaultBackend = nil diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index a2f54d9a177..7e23d4342c6 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -30,17 +30,19 @@ import ( ) const ( - testDefaultBeNodePort = int64(3000) - defaultZone = "zone-a" + defaultZone = "zone-a" +) + +var ( + testDefaultBeNodePort = backends.ServicePort{Port: 3000, Encrypted: false} ) func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - fakeHCs := healthchecks.NewFakeHealthChecks() + fakeHCP := healthchecks.NewFakeHealthCheckProvider() namer := &utils.Namer{} - healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) - healthChecker.Init(&healthchecks.FakeHealthCheckGetter{}) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) backendPool := backends.NewBackendPool( diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 49109839598..48bc1760894 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/controller" "k8s.io/ingress/controllers/gce/loadbalancers" "k8s.io/ingress/controllers/gce/storage" @@ -226,11 +227,13 @@ func main() { glog.Fatalf("Default backend should take the form namespace/name: %v", *defaultSvc) } - defaultBackendNodePort, err := getNodePort(kubeClient, parts[0], parts[1]) + nodePort, err := getNodePort(kubeClient, parts[0], parts[1]) if err != nil { glog.Fatalf("Could not configure default backend %v: %v", *defaultSvc, err) } + // The default backend is known to be HTTP + defaultBackendNodePort := backends.ServicePort{Port: nodePort, Encrypted: false} if *inCluster || *useRealCloud { // Create cluster manager diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index 288258d9c90..da51bd070da 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -79,15 +79,6 @@ const ( // K8sAnnotationPrefix is the prefix used in annotations used to record // debug information in the Ingress annotations. K8sAnnotationPrefix = "ingress.kubernetes.io" - - // DefaultHealthCheckInterval defines how frequently a probe runs - DefaultHealthCheckInterval = 60 - // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" - DefaultHealthyThreshold = 1 - // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" - DefaultUnhealthyThreshold = 10 - // DefaultTimeoutSeconds defines the timeout of each probe - DefaultTimeoutSeconds = 60 ) // Namer handles centralized naming for the cluster. @@ -325,25 +316,14 @@ func CompareLinks(l1, l2 string) bool { return l1 == l2 && l1 != "" } +// GetHTTPScheme returns HTTP or HTTP depending on the bool +func GetHTTPScheme(encrypted bool) string { + if encrypted { + return "HTTPS" + } + return "HTTP" +} + // FakeIngressRuleValueMap is a convenience type used by multiple submodules // that share the same testing methods. type FakeIngressRuleValueMap map[string]string - -// DefaultHealthCheckTemplate simply returns the default health check template. -func DefaultHealthCheckTemplate(port int64) *compute.HttpHealthCheck { - return &compute.HttpHealthCheck{ - Port: port, - // Empty string is used as a signal to the caller to use the appropriate - // default. - RequestPath: "", - Description: "Default kubernetes L7 Loadbalancing health check.", - // How often to health check. - CheckIntervalSec: DefaultHealthCheckInterval, - // How long to wait before claiming failure of a health check. - TimeoutSec: DefaultTimeoutSeconds, - // Number of healthchecks to pass for a vm to be deemed healthy. - HealthyThreshold: DefaultHealthyThreshold, - // Number of healthchecks to fail before the vm is deemed unhealthy. - UnhealthyThreshold: DefaultUnhealthyThreshold, - } -}