diff --git a/controllers/gce/README.md b/controllers/gce/README.md index aa9684344c..cf45d037dc 100644 --- a/controllers/gce/README.md +++ b/controllers/gce/README.md @@ -360,15 +360,14 @@ You just instructed the loadbalancer controller to quit, however if it had done #### Health checks -Currently, all service backends must satisfy *either* of the following requirements to pass the HTTP health checks sent to it from the GCE loadbalancer: +Currently, all service backends must satisfy *either* of the following requirements to pass the HTTP(S) health checks sent to it from the GCE loadbalancer: 1. Respond with a 200 on '/'. The content does not matter. 2. Expose an arbitrary url as a `readiness` probe on the pods backing the Service. -The Ingress controller looks for a compatible readiness probe first, if it finds one, it adopts it as the GCE loadbalancer's HTTP health check. If there's no readiness probe, or the readiness probe requires special HTTP headers, or HTTPS, the Ingress controller points the GCE loadbalancer's HTTP health check at '/'. [This is an example](examples/health_checks/README.md) of an Ingress that adopts the readiness probe from the endpoints as its health check. +The Ingress controller looks for a compatible readiness probe first, if it finds one, it adopts it as the GCE loadbalancer's HTTP(S) health check. If there's no readiness probe, or the readiness probe requires special HTTP headers, the Ingress controller points the GCE loadbalancer's HTTP health check at '/'. [This is an example](examples/health_checks/README.md) of an Ingress that adopts the readiness probe from the endpoints as its health check. -## TLS - -You can secure an Ingress by specifying a [secret](http://kubernetes.io/docs/user-guide/secrets) that contains a TLS private key and certificate. Currently the Ingress only supports a single TLS port, 443, and assumes TLS termination. This controller does not support SNI, so it will ignore all but the first cert in the TLS configuration section. The TLS secret must [contain keys](https://github.com/kubernetes/kubernetes/blob/master/pkg/api/types.go#L2696) named `tls.crt` and `tls.key` that contain the certificate and private key to use for TLS, eg: +## Frontend HTTPS +For encrypted communication between the client to the load balancer, you can secure an Ingress by specifying a [secret](http://kubernetes.io/docs/user-guide/secrets) that contains a TLS private key and certificate. Currently the Ingress only supports a single TLS port, 443, and assumes TLS termination. This controller does not support SNI, so it will ignore all but the first cert in the TLS configuration section. The TLS secret must [contain keys](https://github.com/kubernetes/kubernetes/blob/master/pkg/api/types.go#L2696) named `tls.crt` and `tls.key` that contain the certificate and private key to use for TLS, eg: ```yaml apiVersion: v1 @@ -399,6 +398,29 @@ spec: This creates 2 GCE forwarding rules that use a single static ip. Both `:80` and `:443` will direct traffic to your backend, which serves HTTP requests on the target port mentioned in the Service associated with the Ingress. +## Backend HTTPS +For encrypted communication between the load balancer and your Kubernetes service, you need to decorate the the service's port as expecting HTTPS. There's an alpha [Service annotation](examples/backside_https/app.yaml) for specifying the expected protocol per service port. Upon seeing the protocol as HTTPS, the ingress controller will assemble a GCP L7 load balancer with an HTTPS backend-service with a HTTPS health check. + +The annotation value is a stringified JSON map of port-name to "HTTPS" or "HTTP". If you do not specify the port, "HTTP" is assumed. +```yaml +apiVersion: v1 +kind: Service +metadata: + name: my-echo-svc + annotations: + service.alpha.kubernetes.io/app-protocols: '{"my-https-port":"HTTPS"}' + labels: + app: echo +spec: + type: NodePort + ports: + - port: 443 + protocol: TCP + name: my-https-port + selector: + app: echo +``` + #### Redirecting HTTP to HTTPS To redirect traffic from `:80` to `:443` you need to examine the `x-forwarded-proto` header inserted by the GCE L7, since the Ingress does not support redirect rules. In nginx, this is as simple as adding the following lines to your config: diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 115ad52bcf..f6e700557a 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -27,6 +27,7 @@ import ( compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/ingress/controllers/gce/healthchecks" "k8s.io/ingress/controllers/gce/instances" @@ -75,6 +76,7 @@ type Backends struct { nodePool instances.NodePool healthChecker healthchecks.HealthChecker snapshotter storage.Snapshotter + prober probeProvider // ignoredPorts are a set of ports excluded from GC, even // after the Ingress has been deleted. Note that invoking // a Delete() on these ports will still delete the backend. @@ -86,6 +88,12 @@ func portKey(port int64) string { return fmt.Sprintf("%d", port) } +// ServicePort for tupling port and protocol +type ServicePort struct { + Port int64 + Protocol utils.AppProtocol +} + // NewBackendPool returns a new backend pool. // - cloud: implements BackendServices and syncs backends with a cloud provider // - healthChecker: is capable of producing health checks for backends. @@ -134,6 +142,11 @@ func NewBackendPool( return backendPool } +// Init sets the probeProvider interface value +func (b *Backends) Init(pp probeProvider) { + b.prober = pp +} + // Get returns a single backend. func (b *Backends) Get(port int64) (*compute.BackendService, error) { be, err := b.cloud.GetBackendService(b.namer.BeName(port)) @@ -144,16 +157,24 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { return be, nil } -func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { - // Create a new health check - if err := b.healthChecker.Add(namedPort.Port); err != nil { - return nil, err - } - hc, err := b.healthChecker.Get(namedPort.Port) - if err != nil { - return nil, err +func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) { + hc := b.healthChecker.New(sp.Port, sp.Protocol) + if b.prober != nil { + probe, err := b.prober.GetProbe(sp) + if err != nil { + return "", err + } + if probe != nil { + glog.V(2).Infof("Applying httpGet settings of readinessProbe to health check on port %+v", sp) + applyProbeSettingsToHC(probe, hc) + } } - errs := []string{} + + return b.healthChecker.Sync(hc) +} + +func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, hcLink string, protocol utils.AppProtocol, name string) (*compute.BackendService, error) { + var errs []string // We first try to create the backend with balancingMode=RATE. If this // fails, it's mostly likely because there are existing backends with // balancingMode=UTILIZATION. This failure mode throws a googleapi error @@ -161,27 +182,9 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named // and come around to retry with the right balancing mode. The goal is to // switch everyone to using RATE. for _, bm := range []BalancingMode{Rate, Utilization} { - backends := getBackendsForIGs(igs) - for _, b := range backends { - switch bm { - case Rate: - b.MaxRate = maxRPS - default: - // TODO: Set utilization and connection limits when we accept them - // as valid fields. - } - b.BalancingMode = string(bm) - } // Create a new backend - backend := &compute.BackendService{ - Name: name, - Protocol: "HTTP", - Backends: backends, - HealthChecks: []string{hc.SelfLink}, - Port: namedPort.Port, - PortName: namedPort.Name, - } - if err := b.cloud.CreateBackendService(backend); err != nil { + bs := newBackendService(igs, bm, namedPort, []string{hcLink}, protocol, name) + if err := b.cloud.CreateBackendService(bs); err != nil { // This is probably a failure because we tried to create the backend // with balancingMode=RATE when there are already backends with // balancingMode=UTILIZATION. Just ignore it and retry setting @@ -198,31 +201,83 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named return nil, fmt.Errorf("%v", strings.Join(errs, "\n")) } +func newBackendService(igs []*compute.InstanceGroup, bm BalancingMode, namedPort *compute.NamedPort, healthCheckLinks []string, protocol utils.AppProtocol, name string) *compute.BackendService { + backends := getBackendsForIGs(igs) + for _, b := range backends { + switch bm { + case Rate: + b.MaxRatePerInstance = maxRPS + default: + // TODO: Set utilization and connection limits when we accept them + // as valid fields. + } + b.BalancingMode = string(bm) + } + + return &compute.BackendService{ + Name: name, + Protocol: string(protocol), + Backends: backends, + HealthChecks: healthCheckLinks, + Port: namedPort.Port, + PortName: namedPort.Name, + } +} + // Add will get or create a Backend for the given port. -func (b *Backends) Add(port int64) error { +func (b *Backends) Add(p ServicePort) error { // We must track the port even if creating the backend failed, because // we might've created a health-check for it. be := &compute.BackendService{} - defer func() { b.snapshotter.Add(portKey(port), be) }() + defer func() { b.snapshotter.Add(portKey(p.Port), be) }() + + igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port) + if err != nil { + return err + } - igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) + // Ensure health check for backend service exists + hcLink, err := b.ensureHealthCheck(p) if err != nil { return err } - be, _ = b.Get(port) + + pName := b.namer.BeName(p.Port) + be, _ = b.Get(p.Port) if be == nil { - glog.Infof("Creating backend for %d instance groups, port %v named port %v", - len(igs), port, namedPort) - be, err = b.create(igs, namedPort, b.namer.BeName(port)) + glog.V(2).Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort) + be, err = b.create(igs, namedPort, hcLink, p.Protocol, pName) if err != nil { return err } } + + existingHCLink := "" + if len(be.HealthChecks) == 1 { + existingHCLink = be.HealthChecks[0] + } + + if be.Protocol != string(p.Protocol) || existingHCLink != hcLink { + glog.V(2).Infof("Updating backend protocol %v (%v) for change in protocol (%v) or health check", pName, be.Protocol, string(p.Protocol)) + be.Protocol = string(p.Protocol) + be.HealthChecks = []string{hcLink} + if err = b.cloud.UpdateBackendService(be); err != nil { + return err + } + } + + // If previous health check was legacy type, we need to delete it. + if existingHCLink != hcLink && strings.Contains(existingHCLink, "/httpHealthChecks/") { + if err = b.healthChecker.DeleteLegacy(p.Port); err != nil { + return err + } + } + // we won't find any igs till the node pool syncs nodes. if len(igs) == 0 { return nil } - if err := b.edgeHop(be, igs); err != nil { + if err = b.edgeHop(be, igs); err != nil { return err } return err @@ -231,7 +286,7 @@ func (b *Backends) Add(port int64) error { // Delete deletes the Backend for the given port. func (b *Backends) Delete(port int64) (err error) { name := b.namer.BeName(port) - glog.Infof("Deleting backend %v", name) + glog.V(2).Infof("Deleting backend service %v", name) defer func() { if utils.IsHTTPErrorCode(err, http.StatusNotFound) { err = nil @@ -241,15 +296,11 @@ func (b *Backends) Delete(port int64) (err error) { } }() // Try deleting health checks even if a backend is not found. - if err = b.cloud.DeleteBackendService(name); err != nil && - !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + if err = b.cloud.DeleteBackendService(name); err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) { return err } - if err = b.healthChecker.Delete(port); err != nil && - !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - return err - } - return nil + + return b.healthChecker.Delete(port) } // List lists all backends. @@ -306,7 +357,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr } // Sync syncs backend services corresponding to ports in the given list. -func (b *Backends) Sync(svcNodePorts []int64) error { +func (b *Backends) Sync(svcNodePorts []ServicePort) error { glog.V(3).Infof("Sync: backends %v", svcNodePorts) // create backends for new ports, perform an edge hop for existing ports @@ -319,14 +370,14 @@ func (b *Backends) Sync(svcNodePorts []int64) error { } // GC garbage collects services corresponding to ports in the given list. -func (b *Backends) GC(svcNodePorts []int64) error { +func (b *Backends) GC(svcNodePorts []ServicePort) error { knownPorts := sets.NewString() - for _, port := range svcNodePorts { - knownPorts.Insert(portKey(port)) + for _, p := range svcNodePorts { + knownPorts.Insert(portKey(p.Port)) } pool := b.snapshotter.Snapshot() for port := range pool { - p, err := strconv.Atoi(port) + p, err := strconv.ParseUint(port, 10, 16) if err != nil { return err } @@ -345,7 +396,7 @@ func (b *Backends) GC(svcNodePorts []int64) error { // Shutdown deletes all backends and the default backend. // This will fail if one of the backends is being used by another resource. func (b *Backends) Shutdown() error { - if err := b.GC([]int64{}); err != nil { + if err := b.GC([]ServicePort{}); err != nil { return err } return nil @@ -365,3 +416,17 @@ func (b *Backends) Status(name string) string { // TODO: State transition are important, not just the latest. return hs.HealthStatus[0].HealthState } + +func applyProbeSettingsToHC(p *api_v1.Probe, hc *healthchecks.HealthCheck) { + healthPath := p.Handler.HTTPGet.Path + // GCE requires a leading "/" for health check urls. + if !strings.HasPrefix(healthPath, "/") { + healthPath = "/" + healthPath + } + + hc.RequestPath = healthPath + hc.Host = p.Handler.HTTPGet.Host + hc.Description = "Kubernetes L7 health check generated with readiness probe settings." + hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds()) + hc.TimeoutSec = int64(p.TimeoutSeconds) +} diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 5766f6919c..8985095ed1 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -17,12 +17,15 @@ limitations under the License. package backends import ( + "fmt" "net/http" "testing" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/ingress/controllers/gce/healthchecks" "k8s.io/ingress/controllers/gce/instances" @@ -34,14 +37,28 @@ const defaultZone = "zone-a" var noOpErrFunc = func(op int, be *compute.BackendService) error { return nil } -func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { +var existingProbe = &api_v1.Probe{ + Handler: api_v1.Handler{ + HTTPGet: &api_v1.HTTPGetAction{ + Scheme: api_v1.URISchemeHTTPS, + Path: "/my-special-path", + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 443, + }, + }, + }, +} + +func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) *Backends { namer := &utils.Namer{} nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) - healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer) - healthChecks.Init(&healthchecks.FakeHealthCheckGetter{DefaultHealthCheck: nil}) - return NewBackendPool( - f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) + healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthCheckProvider(), "/", namer) + bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) + probes := map[ServicePort]*api_v1.Probe{{Port: 443, Protocol: utils.ProtocolHTTPS}: existingProbe} + bp.Init(NewFakeProbeProvider(probes)) + return bp } func TestBackendPoolAdd(t *testing.T) { @@ -50,39 +67,125 @@ func TestBackendPoolAdd(t *testing.T) { pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - // Add a backend for a port, then re-add the same port and - // make sure it corrects a broken link from the backend to - // the instance group. - nodePort := int64(8080) - pool.Add(nodePort) - beName := namer.BeName(nodePort) + testCases := []ServicePort{ + {80, utils.ProtocolHTTP}, + {443, utils.ProtocolHTTPS}, + } + + for _, nodePort := range testCases { + // For simplicity, these tests use 80/443 as nodeports + t.Run(fmt.Sprintf("Port:%v Protocol:%v", nodePort.Port, nodePort.Protocol), func(t *testing.T) { + // Add a backend for a port, then re-add the same port and + // make sure it corrects a broken link from the backend to + // the instance group. + err := pool.Add(nodePort) + if err != nil { + t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err) + } + beName := namer.BeName(nodePort.Port) + + // Check that the new backend has the right port + be, err := f.GetBackendService(beName) + if err != nil { + t.Fatalf("Did not find expected backend %v", beName) + } + if be.Port != nodePort.Port { + t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) + } + + // Check that the instance group has the new port + var found bool + for _, port := range fakeIGs.Ports { + if port == nodePort.Port { + found = true + } + } + if !found { + t.Fatalf("Port %v not added to instance group", nodePort) + } + + // Check the created healthcheck is the correct protocol + // pool.healthChecker. + hc, err := pool.healthChecker.Get(nodePort.Port) + if err != nil { + t.Fatalf("Unexpected err when querying fake healthchecker: %v", err) + } + + if hc.Protocol() != nodePort.Protocol { + t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", hc.Protocol(), nodePort.Protocol) + } + + if nodePort.Port == 443 && hc.RequestPath != "/my-special-path" { + t.Fatalf("Healthcheck for 443 should have special request path from probe") + } + }) + } +} + +func TestBackendPoolUpdate(t *testing.T) { + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + pool := newBackendPool(f, fakeIGs, false) + namer := utils.Namer{} + + p := ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP} + pool.Add(p) + beName := namer.BeName(p.Port) - // Check that the new backend has the right port be, err := f.GetBackendService(beName) if err != nil { - t.Fatalf("Did not find expected backend %v", beName) + t.Fatalf("Unexpected err: %v", err) } - if be.Port != nodePort { - t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) + + if utils.AppProtocol(be.Protocol) != p.Protocol { + t.Fatalf("Expected scheme %v but got %v", p.Protocol, be.Protocol) } - // Check that the instance group has the new port - var found bool - for _, port := range fakeIGs.Ports { - if port == nodePort { - found = true - } + + // Assert the proper health check was created + hc, _ := pool.healthChecker.Get(p.Port) + if hc == nil || hc.Protocol() != p.Protocol { + t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) } - if !found { - t.Fatalf("Port %v not added to instance group", nodePort) + + // Update service port to encrypted + p.Protocol = utils.ProtocolHTTPS + pool.Sync([]ServicePort{p}) + + be, err = f.GetBackendService(beName) + if err != nil { + t.Fatalf("Unexpected err retrieving backend service after update: %v", err) + } + + // Assert the backend has the correct protocol + if utils.AppProtocol(be.Protocol) != p.Protocol { + t.Fatalf("Expected scheme %v but got %v", p.Protocol, utils.AppProtocol(be.Protocol)) + } + + // Assert the proper health check was created + hc, _ = pool.healthChecker.Get(p.Port) + if hc == nil || hc.Protocol() != p.Protocol { + t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) } +} + +func TestBackendPoolChaosMonkey(t *testing.T) { + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + pool := newBackendPool(f, fakeIGs, false) + namer := utils.Namer{} + + nodePort := ServicePort{Port: 8080, Protocol: utils.ProtocolHTTP} + pool.Add(nodePort) + beName := namer.BeName(nodePort.Port) + + be, _ := f.GetBackendService(beName) // Mess up the link between backend service and instance group. // This simulates a user doing foolish things through the UI. - f.calls = []int{} - be, err = f.GetBackendService(beName) be.Backends = []*compute.Backend{ {Group: "test edge hop"}, } + f.calls = []int{} f.UpdateBackendService(be) pool.Add(nodePort) @@ -114,28 +217,35 @@ func TestBackendPoolAdd(t *testing.T) { func TestBackendPoolSync(t *testing.T) { // Call sync on a backend pool with a list of ports, make sure the pool // creates/deletes required ports. - svcNodePorts := []int64{81, 82, 83} + svcNodePorts := []ServicePort{{Port: 81, Protocol: utils.ProtocolHTTP}, {Port: 82, Protocol: utils.ProtocolHTTPS}, {Port: 83, Protocol: utils.ProtocolHTTP}} f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, true) - pool.Add(81) - pool.Add(90) - pool.Sync(svcNodePorts) - pool.GC(svcNodePorts) + pool.Add(ServicePort{Port: 81}) + pool.Add(ServicePort{Port: 90}) + if err := pool.Sync(svcNodePorts); err != nil { + t.Errorf("Expected backend pool to sync, err: %v", err) + } + if err := pool.GC(svcNodePorts); err != nil { + t.Errorf("Expected backend pool to GC, err: %v", err) + } if _, err := pool.Get(90); err == nil { t.Fatalf("Did not expect to find port 90") } for _, port := range svcNodePorts { - if _, err := pool.Get(port); err != nil { + if _, err := pool.Get(port.Port); err != nil { t.Fatalf("Expected to find port %v", port) } } - svcNodePorts = []int64{81} - deletedPorts := []int64{82, 83} - pool.GC(svcNodePorts) + svcNodePorts = []ServicePort{{Port: 81}} + deletedPorts := []ServicePort{{Port: 82}, {Port: 83}} + if err := pool.GC(svcNodePorts); err != nil { + t.Fatalf("Expected backend pool to GC, err: %v", err) + } + for _, port := range deletedPorts { - if _, err := pool.Get(port); err == nil { + if _, err := pool.Get(port.Port); err == nil { t.Fatalf("Pool contains %v after deletion", port) } } @@ -152,13 +262,13 @@ func TestBackendPoolSync(t *testing.T) { namer := &utils.Namer{} // This backend should get deleted again since it is managed by this cluster. - f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])}) + f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0].Port)}) // TODO: Avoid casting. // Repopulate the pool with a cloud list, which now includes the 82 port // backend. This would happen if, say, an ingress backend is removed // while the controller is restarting. - pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplenishPool() + pool.snapshotter.(*storage.CloudListingPool).ReplenishPool() pool.GC(svcNodePorts) @@ -178,13 +288,68 @@ func TestBackendPoolSync(t *testing.T) { } } +func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) { + namer := &utils.Namer{} + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + nodePool := instances.NewNodePool(fakeIGs) + nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) + hcp := healthchecks.NewFakeHealthCheckProvider() + healthChecks := healthchecks.NewHealthChecker(hcp, "/", namer) + bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, false) + probes := map[ServicePort]*api_v1.Probe{} + bp.Init(NewFakeProbeProvider(probes)) + + // Create a legacy HTTP health check + beName := namer.BeName(80) + if err := hcp.CreateHttpHealthCheck(&compute.HttpHealthCheck{ + Name: beName, + Port: 80, + }); err != nil { + t.Fatalf("unexpected error creating http health check %v", err) + } + + // Verify health check exists + hc, err := hcp.GetHttpHealthCheck(beName) + if err != nil { + t.Fatalf("unexpected error getting http health check %v", err) + } + + // Create backend service with expected name and link to legacy health check + f.CreateBackendService(&compute.BackendService{ + Name: beName, + HealthChecks: []string{hc.SelfLink}, + }) + + // Have pool sync the above backend service + bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}) + + // Verify the legacy health check has been deleted + _, err = hcp.GetHttpHealthCheck(beName) + if err == nil { + t.Fatalf("expected error getting http health check %v", err) + } + + // Verify a newer health check exists + hcNew, err := hcp.GetHealthCheck(beName) + if err != nil { + t.Fatalf("unexpected error getting http health check %v", err) + } + + // Verify the newer health check is of type HTTPS + if hcNew.Type != string(utils.ProtocolHTTPS) { + t.Fatalf("expected health check type to be %v, actual %v", string(utils.ProtocolHTTPS), hcNew.Type) + } +} + func TestBackendPoolShutdown(t *testing.T) { f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - pool.Add(80) + // Add a backend-service and verify that it doesn't exist after Shutdown() + pool.Add(ServicePort{Port: 80}) pool.Shutdown() if _, err := f.GetBackendService(namer.BeName(80)); err == nil { t.Fatalf("%v", err) @@ -198,7 +363,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { namer := utils.Namer{} // This will add the instance group k8s-ig to the instance pool - pool.Add(80) + pool.Add(ServicePort{Port: 80}) be, err := f.GetBackendService(namer.BeName(80)) if err != nil { @@ -211,12 +376,12 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { {Group: "k8s-ig-foo"}, } be.Backends = append(be.Backends, newGroups...) - if err := f.UpdateBackendService(be); err != nil { + if err = f.UpdateBackendService(be); err != nil { t.Fatalf("Failed to update backend service %v", be.Name) } // Make sure repeated adds don't clobber the inserted instance group - pool.Add(80) + pool.Add(ServicePort{Port: 80}) be, err = f.GetBackendService(namer.BeName(80)) if err != nil { t.Fatalf("%v", err) @@ -242,7 +407,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - nodePort := int64(8080) + nodePort := ServicePort{Port: 8080} modes := []BalancingMode{Rate, Utilization} // block the creation of Backends with the given balancingMode @@ -259,7 +424,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { } pool.Add(nodePort) - be, err := f.GetBackendService(namer.BeName(nodePort)) + be, err := f.GetBackendService(namer.BeName(nodePort.Port)) if err != nil { t.Fatalf("%v", err) } @@ -269,6 +434,32 @@ func TestBackendCreateBalancingMode(t *testing.T) { t.Fatalf("Wrong balancing mode, expected %v got %v", modes[(i+1)%len(modes)], b.BalancingMode) } } - pool.GC([]int64{}) + pool.GC([]ServicePort{}) + } +} + +func TestApplyProbeSettingsToHC(t *testing.T) { + p := "healthz" + hc := healthchecks.DefaultHealthCheck(8080, utils.ProtocolHTTPS) + probe := &api_v1.Probe{ + Handler: api_v1.Handler{ + HTTPGet: &api_v1.HTTPGetAction{ + Scheme: api_v1.URISchemeHTTP, + Path: p, + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + } + + applyProbeSettingsToHC(probe, hc) + + if hc.Protocol() != utils.ProtocolHTTPS || hc.Port != 8080 { + t.Errorf("Basic HC settings changed") + } + if hc.RequestPath != "/"+p { + t.Errorf("Failed to apply probe's requestpath") } } diff --git a/controllers/gce/backends/fakes.go b/controllers/gce/backends/fakes.go index 2fe73cf864..d1f13d5136 100644 --- a/controllers/gce/backends/fakes.go +++ b/controllers/gce/backends/fakes.go @@ -20,6 +20,7 @@ import ( "fmt" compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress/controllers/gce/utils" @@ -118,3 +119,21 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput return &compute.BackendServiceGroupHealth{ HealthStatus: states}, nil } + +// FakeProbeProvider implements the probeProvider interface for tests. +type FakeProbeProvider struct { + probes map[ServicePort]*api_v1.Probe +} + +// NewFakeProbeProvider returns a struct which satifies probeProvider interface +func NewFakeProbeProvider(probes map[ServicePort]*api_v1.Probe) *FakeProbeProvider { + return &FakeProbeProvider{probes} +} + +// GetProbe returns the probe for a given nodePort +func (pp *FakeProbeProvider) GetProbe(port ServicePort) (*api_v1.Probe, error) { + if probe, exists := pp.probes[port]; exists && probe.HTTPGet != nil { + return probe, nil + } + return nil, nil +} diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 1e53368fc9..c24fc88a37 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -18,16 +18,23 @@ package backends import ( compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/client-go/pkg/api/v1" ) +// ProbeProvider retrieves a probe struct given a nodePort +type probeProvider interface { + GetProbe(sp ServicePort) (*api_v1.Probe, error) +} + // BackendPool is an interface to manage a pool of kubernetes nodePort services // as gce backendServices, and sync them through the BackendServices interface. type BackendPool interface { - Add(port int64) error + Init(p probeProvider) + Add(port ServicePort) error Get(port int64) (*compute.BackendService, error) Delete(port int64) error - Sync(ports []int64) error - GC(ports []int64) error + Sync(ports []ServicePort) error + GC(ports []ServicePort) error Shutdown() error Status(name string) string List() ([]interface{}, error) diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 79d21bc17f..2f2a0836fc 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -65,7 +65,7 @@ const ( // ClusterManager manages cluster resource pools. type ClusterManager struct { ClusterNamer *utils.Namer - defaultBackendNodePort int64 + defaultBackendNodePort backends.ServicePort instancePool instances.NodePool backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool @@ -83,9 +83,7 @@ type ClusterManager struct { // Init initializes the cluster manager. func (c *ClusterManager) Init(tr *GCETranslator) { c.instancePool.Init(tr) - for _, h := range c.healthCheckers { - h.Init(tr) - } + c.backendPool.Init(tr) // TODO: Initialize other members as needed. } @@ -126,17 +124,17 @@ func (c *ClusterManager) shutdown() error { // these ports must also be opened on the corresponding Instance Group. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []int64) error { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error { // Multiple ingress paths can point to the same service (and hence nodePort) // but each nodePort can only have one set of cloud resources behind it. So // don't waste time double validating GCE BackendServices. - portMap := map[int64]struct{}{} + portMap := map[int64]backends.ServicePort{} for _, p := range nodePorts { - portMap[p] = struct{}{} + portMap[p.Port] = p } - nodePorts = []int64{} - for p := range portMap { - nodePorts = append(nodePorts, p) + nodePorts = []backends.ServicePort{} + for _, sp := range portMap { + nodePorts = append(nodePorts, sp) } if err := c.backendPool.Sync(nodePorts); err != nil { return err @@ -158,7 +156,12 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // we shouldn't leak the firewall rule. fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort) } - if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { + + var np []int64 + for _, p := range fwNodePorts { + np = append(np, p.Port) + } + if err := c.firewallPool.Sync(np, nodeNames); err != nil { return err } @@ -171,7 +174,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // - nodePorts are the ports for which we want BackendServies. BackendServices // for ports not in this list are deleted. // This method ignores googleapi 404 errors (StatusNotFound). -func (c *ClusterManager) GC(lbNames []string, nodePorts []int64) error { +func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort) error { // On GC: // * Loadbalancers need to get deleted before backends. @@ -240,7 +243,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud { func NewClusterManager( configFilePath string, namer *utils.Namer, - defaultBackendNodePort int64, + defaultBackendNodePort backends.ServicePort, defaultHealthCheckPath string) (*ClusterManager, error) { // TODO: Make this more resilient. Currently we create the cloud client @@ -279,15 +282,12 @@ func NewClusterManager( cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker} // TODO: This needs to change to a consolidated management of the default backend. - cluster.backendPool = backends.NewBackendPool( - cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) - defaultBackendPool := backends.NewBackendPool( - cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) + cluster.backendPool = backends.NewBackendPool(cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.Port}, true) + defaultBackendPool := backends.NewBackendPool(cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. - cluster.l7Pool = loadbalancers.NewLoadBalancerPool( - cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) + cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer) return &cluster, nil } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index 11866b3685..1ad6881bf2 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -29,12 +29,11 @@ import ( "k8s.io/ingress/controllers/gce/utils" ) -const ( - testDefaultBeNodePort = int64(3000) +var ( + testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP} + testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} ) -var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} - // ClusterManager fake type fakeClusterManager struct { *ClusterManager @@ -48,14 +47,13 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName) fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - fakeHCs := healthchecks.NewFakeHealthChecks() + fakeHCP := healthchecks.NewFakeHealthCheckProvider() namer := utils.NewNamer(clusterName, firewallName) nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) - healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) - healthChecker.Init(&healthchecks.FakeHealthCheckGetter{}) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) backendPool := backends.NewBackendPool( fakeBackends, diff --git a/controllers/gce/controller/util_test.go b/controllers/gce/controller/util_test.go index 774750f55b..c521b649dd 100644 --- a/controllers/gce/controller/util_test.go +++ b/controllers/gce/controller/util_test.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" api_v1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/ingress/controllers/gce/backends" + "k8s.io/ingress/controllers/gce/utils" ) // Pods created in loops start from this time, for routines that @@ -94,17 +96,18 @@ func TestInstancesAddedToZones(t *testing.T) { func TestProbeGetter(t *testing.T) { cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) - nodePortToHealthCheck := map[int64]string{ - 3001: "/healthz", - 3002: "/foo", + + nodePortToHealthCheck := map[backends.ServicePort]string{ + {Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz", + {Port: 3002, Protocol: utils.ProtocolHTTPS}: "/foo", } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) } } } @@ -112,8 +115,8 @@ func TestProbeGetter(t *testing.T) { func TestProbeGetterNamedPort(t *testing.T) { cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) - nodePortToHealthCheck := map[int64]string{ - 3001: "/healthz", + nodePortToHealthCheck := map[backends.ServicePort]string{ + {Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz", } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for _, p := range lbc.podLister.Indexer.List() { @@ -122,11 +125,11 @@ func TestProbeGetterNamedPort(t *testing.T) { pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} } for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) } } @@ -167,31 +170,31 @@ func TestProbeGetterCrossNamespace(t *testing.T) { }, } lbc.podLister.Indexer.Add(firstPod) - nodePortToHealthCheck := map[int64]string{ - 3001: "/healthz", + nodePortToHealthCheck := map[backends.ServicePort]string{ + {Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz", } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) } } } -func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string, ns string) { +func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[backends.ServicePort]string, ns string) { delay := time.Minute for np, u := range nodePortToHealthCheck { - l := map[string]string{fmt.Sprintf("app-%d", np): "test"} + l := map[string]string{fmt.Sprintf("app-%d", np.Port): "test"} svc := &api_v1.Service{ Spec: api_v1.ServiceSpec{ Selector: l, Ports: []api_v1.ServicePort{ { - NodePort: int32(np), + NodePort: int32(np.Port), TargetPort: intstr.IntOrString{ Type: intstr.Int, IntVal: 80, @@ -200,14 +203,14 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string }, }, } - svc.Name = fmt.Sprintf("%d", np) + svc.Name = fmt.Sprintf("%d", np.Port) svc.Namespace = ns lbc.svcLister.Indexer.Add(svc) pod := &api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ Labels: l, - Name: fmt.Sprintf("%d", np), + Name: fmt.Sprintf("%d", np.Port), Namespace: ns, CreationTimestamp: meta_v1.NewTime(firstPodCreationTime.Add(delay)), }, @@ -218,7 +221,7 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string ReadinessProbe: &api_v1.Probe{ Handler: api_v1.Handler{ HTTPGet: &api_v1.HTTPGetAction{ - Scheme: api_v1.URISchemeHTTP, + Scheme: api_v1.URIScheme(string(np.Protocol)), Path: u, Port: intstr.IntOrString{ Type: intstr.Int, @@ -257,3 +260,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { } lbc.CloudClusterManager.instancePool.Init(lbc.tr) } + +func getProbePath(p *api_v1.Probe) string { + return p.Handler.HTTPGet.Path +} diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 34b7eb10c0..697ce58e56 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "encoding/json" "fmt" "sort" "strconv" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/loadbalancers" "k8s.io/ingress/controllers/gce/utils" ) @@ -63,6 +65,12 @@ const ( // to the target proxies of the Ingress. preSharedCertKey = "ingress.gcp.kubernetes.io/pre-shared-cert" + // serviceApplicationProtocolKey is a stringified JSON map of port names to + // protocol strings. Possible values are HTTP, HTTPS + // Example: + // '{"my-https-port":"HTTPS","my-http-port":"HTTP"}' + serviceApplicationProtocolKey = "service.alpha.kubernetes.io/app-protocols" + // ingressClassKey picks a specific "class" for the Ingress. The controller // only processes Ingresses with this annotation either unset, or set // to either gceIngessClass or the empty string. @@ -116,6 +124,30 @@ func (ing ingAnnotations) ingressClass() string { return val } +// svcAnnotations represents Service annotations. +type svcAnnotations map[string]string + +func (svc svcAnnotations) ApplicationProtocols() (map[string]utils.AppProtocol, error) { + val, ok := svc[serviceApplicationProtocolKey] + if !ok { + return map[string]utils.AppProtocol{}, nil + } + + var portToProtos map[string]utils.AppProtocol + err := json.Unmarshal([]byte(val), &portToProtos) + + // Verify protocol is an accepted value + for _, proto := range portToProtos { + switch proto { + case utils.ProtocolHTTP, utils.ProtocolHTTPS: + default: + return nil, fmt.Errorf("invalid port application protocol: %v", proto) + } + } + + return portToProtos, err +} + // isGCEIngress returns true if the given Ingress either doesn't specify the // ingress.class annotation, or it's set to "gce". func isGCEIngress(ing *extensions.Ingress) bool { @@ -134,6 +166,15 @@ func (e errorNodePortNotFound) Error() string { e.backend, e.origErr) } +type errorSvcAppProtosParsing struct { + svc *api_v1.Service + origErr error +} + +func (e errorSvcAppProtosParsing) Error() string { + return fmt.Sprintf("could not parse %v annotation on Service %v/%v, err: %v", serviceApplicationProtocolKey, e.svc.Namespace, e.svc.Name, e.origErr) +} + // taskQueue manages a work queue through an independent worker that // invokes the given sync function for every work item inserted. type taskQueue struct { @@ -221,6 +262,7 @@ type StoreToPodLister struct { cache.Indexer } +// List returns a list of all pods based on selector func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, err error) { err = ListAll(s.Indexer, selector, func(m interface{}) { ret = append(ret, m.(*api_v1.Pod)) @@ -228,6 +270,7 @@ func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, er return ret, err } +// ListAll iterates a store and passes selected item to a func func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendFunc) error { for _, m := range store.List() { metadata, err := meta.Accessor(m) @@ -362,17 +405,16 @@ func (t *GCETranslator) toGCEBackend(be *extensions.IngressBackend, ns string) ( if err != nil { return nil, err } - backend, err := t.CloudClusterManager.backendPool.Get(int64(port)) + backend, err := t.CloudClusterManager.backendPool.Get(port.Port) if err != nil { - return nil, fmt.Errorf( - "no GCE backend exists for port %v, kube backend %+v", port, be) + return nil, fmt.Errorf("no GCE backend exists for port %v, kube backend %+v", port, be) } return backend, nil } // getServiceNodePort looks in the svc store for a matching service:port, // and returns the nodeport. -func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (int, error) { +func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (backends.ServicePort, error) { obj, exists, err := t.svcLister.Indexer.Get( &api_v1.Service{ ObjectMeta: meta_v1.ObjectMeta{ @@ -381,37 +423,51 @@ func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespa }, }) if !exists { - return invalidPort, errorNodePortNotFound{be, fmt.Errorf( - "service %v/%v not found in store", namespace, be.ServiceName)} + return backends.ServicePort{}, errorNodePortNotFound{be, fmt.Errorf("service %v/%v not found in store", namespace, be.ServiceName)} + } + if err != nil { + return backends.ServicePort{}, errorNodePortNotFound{be, err} } + svc := obj.(*api_v1.Service) + appProtocols, err := svcAnnotations(svc.GetAnnotations()).ApplicationProtocols() if err != nil { - return invalidPort, errorNodePortNotFound{be, err} + return backends.ServicePort{}, errorSvcAppProtosParsing{svc, err} } - var nodePort int - for _, p := range obj.(*api_v1.Service).Spec.Ports { + + var port *api_v1.ServicePort +PortLoop: + for _, p := range svc.Spec.Ports { + np := p switch be.ServicePort.Type { case intstr.Int: if p.Port == be.ServicePort.IntVal { - nodePort = int(p.NodePort) - break + port = &np + break PortLoop } default: if p.Name == be.ServicePort.StrVal { - nodePort = int(p.NodePort) - break + port = &np + break PortLoop } } } - if nodePort != invalidPort { - return nodePort, nil + + if port == nil { + return backends.ServicePort{}, errorNodePortNotFound{be, fmt.Errorf("could not find matching nodeport from service")} } - return invalidPort, errorNodePortNotFound{be, fmt.Errorf( - "could not find matching nodeport from service")} + + proto := utils.ProtocolHTTP + if protoStr, exists := appProtocols[port.Name]; exists { + proto = utils.AppProtocol(protoStr) + } + + p := backends.ServicePort{Port: int64(port.NodePort), Protocol: proto} + return p, nil } // toNodePorts converts a pathlist to a flat list of nodeports. -func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { - knownPorts := []int64{} +func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort { + var knownPorts []backends.ServicePort for _, ing := range ings.Items { defaultBackend := ing.Spec.Backend if defaultBackend != nil { @@ -419,7 +475,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { if err != nil { glog.Infof("%v", err) } else { - knownPorts = append(knownPorts, int64(port)) + knownPorts = append(knownPorts, port) } } for _, rule := range ing.Spec.Rules { @@ -433,7 +489,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { glog.Infof("%v", err) continue } - knownPorts = append(knownPorts, int64(port)) + knownPorts = append(knownPorts, port) } } } @@ -479,7 +535,7 @@ func (t *GCETranslator) ListZones() ([]string, error) { // geHTTPProbe returns the http readiness probe from the first container // that matches targetPort, from the set of pods matching the given labels. -func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString) (*api_v1.Probe, error) { +func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString, protocol utils.AppProtocol) (*api_v1.Probe, error) { l := svc.Spec.Selector // Lookup any container with a matching targetPort from the set of pods @@ -498,12 +554,13 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr } logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort) for _, c := range pod.Spec.Containers { - if !isSimpleHTTPProbe(c.ReadinessProbe) { + if !isSimpleHTTPProbe(c.ReadinessProbe) || string(protocol) != string(c.ReadinessProbe.HTTPGet.Scheme) { continue } + for _, p := range c.Ports { - if targetPort.Type == intstr.Int && targetPort.IntVal == p.ContainerPort || - targetPort.Type == intstr.String && targetPort.StrVal == p.Name { + if (targetPort.Type == intstr.Int && targetPort.IntVal == p.ContainerPort) || + (targetPort.Type == intstr.String && targetPort.StrVal == p.Name) { readinessProbePort := c.ReadinessProbe.Handler.HTTPGet.Port switch readinessProbePort.Type { @@ -529,80 +586,39 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr // isSimpleHTTPProbe returns true if the given Probe is: // - an HTTPGet probe, as opposed to a tcp or exec probe -// - has a scheme of HTTP, as opposed to HTTPS // - has no special host or headers fields func isSimpleHTTPProbe(probe *api_v1.Probe) bool { return (probe != nil && probe.Handler.HTTPGet != nil && probe.Handler.HTTPGet.Host == "" && - probe.Handler.HTTPGet.Scheme == api_v1.URISchemeHTTP && len(probe.Handler.HTTPGet.HTTPHeaders) == 0) + len(probe.Handler.HTTPGet.HTTPHeaders) == 0) } -// HealthCheck returns the http readiness probe for the endpoint backing the -// given nodePort. If no probe is found it returns a health check with "" as -// the request path, callers are responsible for swapping this out for the -// appropriate default. -func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { +// GetProbe returns a probe that's used for the given nodeport +func (t *GCETranslator) GetProbe(port backends.ServicePort) (*api_v1.Probe, error) { sl := t.svcLister.List() - var ingresses []extensions.Ingress - var healthCheck *compute.HttpHealthCheck + // Find the label and target port of the one service with the given nodePort + var service api_v1.Service + var svcPort api_v1.ServicePort + var found bool +OuterLoop: for _, as := range sl { - s := as.(*api_v1.Service) - for _, p := range s.Spec.Ports { - + service = *as.(*api_v1.Service) + for _, sp := range service.Spec.Ports { + svcPort = sp // only one Service can match this nodePort, try and look up // the readiness probe of the pods behind it - if int32(port) != p.NodePort { - continue - } - rp, err := t.getHTTPProbe(*s, p.TargetPort) - if err != nil { - return nil, err - } - if rp == nil { - glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port) - break + if int32(port.Port) == sp.NodePort { + found = true + break OuterLoop } - - healthPath := rp.Handler.HTTPGet.Path - // GCE requires a leading "/" for health check urls. - if string(healthPath[0]) != "/" { - healthPath = fmt.Sprintf("/%v", healthPath) - } - - host := rp.Handler.HTTPGet.Host - glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath) - // remember the ingresses that use this Service so we can send - // the right events - ingresses, err = t.ingLister.GetServiceIngress(s) - if err != nil { - glog.Warningf("Failed to list ingresses for service %v", s.Name) - } - - healthCheck = &compute.HttpHealthCheck{ - Port: port, - RequestPath: healthPath, - Host: host, - Description: "kubernetes L7 health check from readiness probe.", - // set a low health threshold and a high failure threshold. - // We're just trying to detect if the node networking is - // borked, service level outages will get detected sooner - // by kube-proxy. - CheckIntervalSec: int64(rp.PeriodSeconds + utils.DefaultHealthCheckInterval), - TimeoutSec: int64(rp.TimeoutSeconds), - HealthyThreshold: utils.DefaultHealthyThreshold, - UnhealthyThreshold: utils.DefaultUnhealthyThreshold, - // TODO: include headers after updating compute godep. - } - break } } - if healthCheck == nil { - healthCheck = utils.DefaultHealthCheckTemplate(port) - } - for _, ing := range ingresses { - t.recorder.Eventf(&ing, api_v1.EventTypeNormal, "GCE", fmt.Sprintf("health check using %v:%v%v", healthCheck.Host, healthCheck.Port, healthCheck.RequestPath)) + + if !found { + return nil, fmt.Errorf("unable to find nodeport %v in any service", port) } - return healthCheck, nil + + return t.getHTTPProbe(service, svcPort.TargetPort, port.Protocol) } // PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker. diff --git a/controllers/gce/examples/backside_https/app.yaml b/controllers/gce/examples/backside_https/app.yaml new file mode 100644 index 0000000000..6a01803a7d --- /dev/null +++ b/controllers/gce/examples/backside_https/app.yaml @@ -0,0 +1,50 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: my-echo-deploy +spec: + replicas: 2 + template: + metadata: + labels: + app: echo + spec: + containers: + - name: echoserver + image: nicksardo/echoserver:latest + imagePullPolicy: Always + ports: + - name: echo-443 + containerPort: 443 + # readinessProbe: # Health check settings can be retrieved from an HTTPS readinessProbe as well + # httpGet: + # path: /healthcheck # Custom health check path for testing + # scheme: HTTPS + # port: echo-443 +--- +apiVersion: v1 +kind: Service +metadata: + name: my-echo-svc + annotations: + service.alpha.kubernetes.io/app-protocols: '{"my-https-port":"HTTPS"}' # Must map port-name to HTTPS for the GCP ingress controller + labels: + app: echo +spec: + type: NodePort + ports: + - port: 12345 # Port doesn't matter as nodeport is used for Ingress + targetPort: echo-443 + protocol: TCP + name: my-https-port + selector: + app: echo +--- +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: my-echo-ingress +spec: + backend: + serviceName: my-echo-svc + servicePort: my-https-port diff --git a/controllers/gce/examples/health_checks/README.md b/controllers/gce/examples/health_checks/README.md index 25d2049dc9..e7a4b35bfd 100644 --- a/controllers/gce/examples/health_checks/README.md +++ b/controllers/gce/examples/health_checks/README.md @@ -1,6 +1,6 @@ # Simple HTTP health check example -The GCE Ingress controller adopts the readiness probe from the matching endpoints, provided the readiness probe doesn't require HTTPS or special headers. +The GCE Ingress controller adopts the readiness probe from the matching endpoints, provided the readiness probe doesn't require special headers. Create the following app: ```console @@ -67,8 +67,6 @@ You can confirm the health check endpoint point it's using one of 2 ways: A few points to note: * The readiness probe must be exposed on the port matching the `servicePort` specified in the Ingress -* The readiness probe cannot have special requirements, like headers or HTTPS +* The readiness probe cannot have special requirements like headers * The probe timeouts are translated to GCE health check timeouts * You must create the pods backing the endpoints with the given readiness probe. This *will not* work if you update the replication controller with a different readiness probe. - - diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 07240ccefc..013e3bf26c 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -17,86 +17,98 @@ limitations under the License. package healthchecks import ( - "fmt" - compute "google.golang.org/api/compute/v1" - - "k8s.io/ingress/controllers/gce/utils" + "google.golang.org/api/googleapi" ) -// NewFakeHealthChecks returns a new FakeHealthChecks. -func NewFakeHealthChecks() *FakeHealthChecks { - return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} -} - -// FakeHealthCheckGetter implements the healthCheckGetter interface for tests. -type FakeHealthCheckGetter struct { - DefaultHealthCheck *compute.HttpHealthCheck +func fakeNotFoundErr() *googleapi.Error { + return &googleapi.Error{Code: 404} } -// HealthCheck returns the health check for the given port. If a health check -// isn't stored under the DefaultHealthCheck member, it constructs one. -func (h *FakeHealthCheckGetter) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { - if h.DefaultHealthCheck == nil { - return utils.DefaultHealthCheckTemplate(port), nil +// NewFakeHealthCheckProvider returns a new FakeHealthChecks. +func NewFakeHealthCheckProvider() *FakeHealthCheckProvider { + return &FakeHealthCheckProvider{ + http: make(map[string]compute.HttpHealthCheck), + generic: make(map[string]compute.HealthCheck), } - return h.DefaultHealthCheck, nil } -// FakeHealthChecks fakes out health checks. -type FakeHealthChecks struct { - hc []*compute.HttpHealthCheck +// FakeHealthCheckProvider fakes out health checks. +type FakeHealthCheckProvider struct { + http map[string]compute.HttpHealthCheck + generic map[string]compute.HealthCheck } // CreateHttpHealthCheck fakes out http health check creation. -func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - f.hc = append(f.hc, hc) +func (f *FakeHealthCheckProvider) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + v := *hc + v.SelfLink = "https://fake.google.com/compute/httpHealthChecks/" + hc.Name + f.http[hc.Name] = v return nil } // GetHttpHealthCheck fakes out getting a http health check from the cloud. -func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { - for _, h := range f.hc { - if h.Name == name { - return h, nil - } +func (f *FakeHealthCheckProvider) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { + if hc, found := f.http[name]; found { + return &hc, nil } - return nil, fmt.Errorf("health check %v not found", name) + + return nil, fakeNotFoundErr() } // DeleteHttpHealthCheck fakes out deleting a http health check. -func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { - healthChecks := []*compute.HttpHealthCheck{} - exists := false - for _, h := range f.hc { - if h.Name == name { - exists = true - continue - } - healthChecks = append(healthChecks, h) - } - if !exists { - return fmt.Errorf("failed to find health check %v", name) +func (f *FakeHealthCheckProvider) DeleteHttpHealthCheck(name string) error { + if _, exists := f.http[name]; !exists { + return fakeNotFoundErr() } - f.hc = healthChecks + + delete(f.http, name) return nil } // UpdateHttpHealthCheck sends the given health check as an update. -func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - healthChecks := []*compute.HttpHealthCheck{} - found := false - for _, h := range f.hc { - if h.Name == hc.Name { - healthChecks = append(healthChecks, hc) - found = true - } else { - healthChecks = append(healthChecks, h) - } +func (f *FakeHealthCheckProvider) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + if _, exists := f.http[hc.Name]; !exists { + return fakeNotFoundErr() + } + + f.http[hc.Name] = *hc + return nil +} + +// CreateHealthCheck fakes out http health check creation. +func (f *FakeHealthCheckProvider) CreateHealthCheck(hc *compute.HealthCheck) error { + v := *hc + v.SelfLink = "https://fake.google.com/compute/healthChecks/" + hc.Name + f.generic[hc.Name] = v + return nil +} + +// GetHealthCheck fakes out getting a http health check from the cloud. +func (f *FakeHealthCheckProvider) GetHealthCheck(name string) (*compute.HealthCheck, error) { + if hc, found := f.generic[name]; found { + return &hc, nil + } + + return nil, fakeNotFoundErr() +} + +// DeleteHealthCheck fakes out deleting a http health check. +func (f *FakeHealthCheckProvider) DeleteHealthCheck(name string) error { + if _, exists := f.generic[name]; !exists { + return fakeNotFoundErr() } - if !found { - return fmt.Errorf("cannot update a non-existent health check %v", hc.Name) + + delete(f.generic, name) + return nil +} + +// UpdateHealthCheck sends the given health check as an update. +func (f *FakeHealthCheckProvider) UpdateHealthCheck(hc *compute.HealthCheck) error { + if _, exists := f.generic[hc.Name]; !exists { + return fakeNotFoundErr() } - f.hc = healthChecks + + f.generic[hc.Name] = *hc return nil } diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index 6b37392ab9..98f0a5f430 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -18,77 +18,200 @@ package healthchecks import ( "net/http" + "time" - "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + "github.com/golang/glog" + "k8s.io/ingress/controllers/gce/utils" ) +const ( + // These values set a low health threshold and a high failure threshold. + // We're just trying to detect if the node networking is + // borked, service level outages will get detected sooner + // by kube-proxy. + // DefaultHealthCheckInterval defines how frequently a probe runs + DefaultHealthCheckInterval = 60 * time.Second + // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" + DefaultHealthyThreshold = 1 + // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" + DefaultUnhealthyThreshold = 10 + // DefaultTimeout defines the timeout of each probe + DefaultTimeout = 60 * time.Second +) + // HealthChecks manages health checks. type HealthChecks struct { - cloud SingleHealthCheck + cloud HealthCheckProvider defaultPath string namer *utils.Namer - healthCheckGetter } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. -func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { - return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil} +func NewHealthChecker(cloud HealthCheckProvider, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { + return &HealthChecks{cloud, defaultHealthCheckPath, namer} } -// Init initializes the health checker. -func (h *HealthChecks) Init(r healthCheckGetter) { - h.healthCheckGetter = r +// New returns a *HealthCheck with default settings and specified port/protocol +func (h *HealthChecks) New(port int64, protocol utils.AppProtocol) *HealthCheck { + hc := DefaultHealthCheck(port, protocol) + hc.Name = h.namer.BeName(port) + return hc } -// Add adds a healthcheck if one for the same port doesn't already exist. -func (h *HealthChecks) Add(port int64) error { - wantHC, err := h.healthCheckGetter.HealthCheck(port) +// Sync retrieves a health check based on port, checks type and settings and updates/creates if necessary. +// Sync is only called by the backends.Add func - it's not a pool like other resources. +func (h *HealthChecks) Sync(hc *HealthCheck) (string, error) { + // Verify default path + if hc.RequestPath == "" { + hc.RequestPath = h.defaultPath + } + + existingHC, err := h.Get(hc.Port) if err != nil { - return err + if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + return "", err + } + + glog.V(2).Infof("Creating health check for port %v with protocol %v", hc.Port, hc.Type) + if err = h.cloud.CreateHealthCheck(hc.ToComputeHealthCheck()); err != nil { + return "", err + } + + return h.getHealthCheckLink(hc.Port) } - if wantHC.RequestPath == "" { - wantHC.RequestPath = h.defaultPath + + if existingHC.Protocol() != hc.Protocol() { + glog.V(2).Infof("Updating health check %v because it has protocol %v but need %v", existingHC.Name, existingHC.Type, hc.Type) + err = h.cloud.UpdateHealthCheck(hc.ToComputeHealthCheck()) + return existingHC.SelfLink, err } - name := h.namer.BeName(port) - wantHC.Name = name - hc, _ := h.Get(port) - if hc == nil { - // TODO: check if the readiness probe has changed and update the - // health check. - glog.Infof("Creating health check %v", name) - if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil { - return err - } - } else if wantHC.RequestPath != hc.RequestPath { + + if existingHC.RequestPath != hc.RequestPath { // TODO: reconcile health checks, and compare headers interval etc. // Currently Ingress doesn't expose all the health check params // natively, so some users prefer to hand modify the check. - glog.Infof("Unexpected request path on health check %v, has %v want %v, NOT reconciling", - name, hc.RequestPath, wantHC.RequestPath) + glog.V(2).Infof("Unexpected request path on health check %v, has %v want %v, NOT reconciling", hc.Name, existingHC.RequestPath, hc.RequestPath) } else { - glog.Infof("Health check %v already exists and has the expected path %v", hc.Name, hc.RequestPath) + glog.V(2).Infof("Health check %v already exists and has the expected path %v", hc.Name, hc.RequestPath) } - return nil + + return existingHC.SelfLink, nil +} + +func (h *HealthChecks) getHealthCheckLink(port int64) (string, error) { + hc, err := h.Get(port) + if err != nil { + return "", err + } + return hc.SelfLink, nil } // Delete deletes the health check by port. func (h *HealthChecks) Delete(port int64) error { name := h.namer.BeName(port) - glog.Infof("Deleting health check %v", name) - if err := h.cloud.DeleteHttpHealthCheck(h.namer.BeName(port)); err != nil { - if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - return err - } + glog.V(2).Infof("Deleting health check %v", name) + return h.cloud.DeleteHealthCheck(name) +} + +// Get returns the health check by port +func (h *HealthChecks) Get(port int64) (*HealthCheck, error) { + name := h.namer.BeName(port) + hc, err := h.cloud.GetHealthCheck(name) + return NewHealthCheck(hc), err +} + +// DeleteLegacy deletes legacy HTTP health checks +func (h *HealthChecks) DeleteLegacy(port int64) error { + name := h.namer.BeName(port) + glog.V(2).Infof("Deleting legacy HTTP health check %v", name) + return h.cloud.DeleteHttpHealthCheck(name) +} + +// DefaultHealthCheck simply returns the default health check. +func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck { + httpSettings := compute.HTTPHealthCheck{ + Port: port, + // Empty string is used as a signal to the caller to use the appropriate + // default. + RequestPath: "", + } + + hcSettings := compute.HealthCheck{ + // How often to health check. + CheckIntervalSec: int64(DefaultHealthCheckInterval.Seconds()), + // How long to wait before claiming failure of a health check. + TimeoutSec: int64(DefaultTimeout.Seconds()), + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: DefaultHealthyThreshold, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: DefaultUnhealthyThreshold, + Description: "Default kubernetes L7 Loadbalancing health check.", + Type: string(protocol), + } + + return &HealthCheck{ + HTTPHealthCheck: httpSettings, + HealthCheck: hcSettings, } - return nil } -// Get returns the given health check. -func (h *HealthChecks) Get(port int64) (*compute.HttpHealthCheck, error) { - return h.cloud.GetHttpHealthCheck(h.namer.BeName(port)) +// HealthCheck embeds two types - the generic healthcheck compute.HealthCheck +// and the HTTP settings compute.HTTPHealthCheck. By embedding both, consumers can modify +// all relevant settings (HTTP specific and HealthCheck generic) regardless of Type +// Consumers should call .Out() func to generate a compute.HealthCheck +// with the proper child struct (.HttpHealthCheck, .HttpshealthCheck, etc). +type HealthCheck struct { + compute.HTTPHealthCheck + compute.HealthCheck +} + +// NewHealthCheck creates a HealthCheck which abstracts nested structs away +func NewHealthCheck(hc *compute.HealthCheck) *HealthCheck { + if hc == nil { + return nil + } + + v := &HealthCheck{HealthCheck: *hc} + switch utils.AppProtocol(hc.Type) { + case utils.ProtocolHTTP: + v.HTTPHealthCheck = *hc.HttpHealthCheck + case utils.ProtocolHTTPS: + // HTTPHealthCheck and HTTPSHealthChecks have identical fields + v.HTTPHealthCheck = compute.HTTPHealthCheck(*hc.HttpsHealthCheck) + } + + // Users should be modifying HTTP(S) specific settings on the embedded + // HTTPHealthCheck. Setting these to nil for preventing confusion. + v.HealthCheck.HttpHealthCheck = nil + v.HealthCheck.HttpsHealthCheck = nil + + return v +} + +// Protocol returns the type cased to AppProtocol +func (hc *HealthCheck) Protocol() utils.AppProtocol { + return utils.AppProtocol(hc.Type) +} + +// ToComputeHealthCheck returns a valid compute.HealthCheck object +func (hc *HealthCheck) ToComputeHealthCheck() *compute.HealthCheck { + // Zeroing out child settings as a precaution. GoogleAPI throws an error + // if the wrong child struct is set. + hc.HealthCheck.HttpsHealthCheck = nil + hc.HealthCheck.HttpHealthCheck = nil + + switch hc.Protocol() { + case utils.ProtocolHTTP: + hc.HealthCheck.HttpHealthCheck = &hc.HTTPHealthCheck + case utils.ProtocolHTTPS: + https := compute.HTTPSHealthCheck(hc.HTTPHealthCheck) + hc.HealthCheck.HttpsHealthCheck = &https + } + + return &hc.HealthCheck } diff --git a/controllers/gce/healthchecks/healthchecks_test.go b/controllers/gce/healthchecks/healthchecks_test.go index 9db1edd496..0f6abc3585 100644 --- a/controllers/gce/healthchecks/healthchecks_test.go +++ b/controllers/gce/healthchecks/healthchecks_test.go @@ -17,47 +17,170 @@ limitations under the License. package healthchecks import ( + "net/http" "testing" + compute "google.golang.org/api/compute/v1" + "k8s.io/ingress/controllers/gce/utils" ) -func TestFakeHealthCheckActions(t *testing.T) { +func TestHealthCheckAdd(t *testing.T) { + namer := utils.NewNamer("ABC", "XYZ") + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + hc := healthChecks.New(80, utils.ProtocolHTTP) + _, err := healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(namer.BeName(80)) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + hc = healthChecks.New(443, utils.ProtocolHTTPS) + _, err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(namer.BeName(443)) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } +} + +func TestHealthCheckAddExisting(t *testing.T) { namer := &utils.Namer{} - healthChecks := NewHealthChecker(NewFakeHealthChecks(), "/", namer) - healthChecks.Init(&FakeHealthCheckGetter{DefaultHealthCheck: nil}) + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // HTTP + // Manually insert a health check + httpHC := DefaultHealthCheck(3000, utils.ProtocolHTTP) + httpHC.Name = namer.BeName(3000) + httpHC.RequestPath = "/my-probes-health" + hcp.CreateHealthCheck(httpHC.ToComputeHealthCheck()) + + // Should not fail adding the same type of health check + hc := healthChecks.New(3000, utils.ProtocolHTTP) + _, err := healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(httpHC.Name) + if err != nil { + t.Fatalf("expected the health check to continue existing, err: %v", err) + } + + // HTTPS + // Manually insert a health check + httpsHC := DefaultHealthCheck(4000, utils.ProtocolHTTPS) + httpsHC.Name = namer.BeName(4000) + httpsHC.RequestPath = "/my-probes-health" + hcp.CreateHealthCheck(httpsHC.ToComputeHealthCheck()) - err := healthChecks.Add(80) + hc = healthChecks.New(4000, utils.ProtocolHTTPS) + _, err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(httpsHC.Name) if err != nil { - t.Fatalf("unexpected error") + t.Fatalf("expected the health check to continue existing, err: %v", err) } +} + +func TestHealthCheckDelete(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // Create HTTP HC for 1234 + hc := DefaultHealthCheck(1234, utils.ProtocolHTTP) + hc.Name = namer.BeName(1234) + hcp.CreateHealthCheck(hc.ToComputeHealthCheck()) + + // Create HTTPS HC for 1234) + hc.Type = string(utils.ProtocolHTTPS) + hcp.CreateHealthCheck(hc.ToComputeHealthCheck()) - _, err1 := healthChecks.Get(8080) - if err1 == nil { - t.Errorf("expected error") + // Delete only HTTP 1234 + err := healthChecks.Delete(1234) + if err != nil { + t.Errorf("unexpected error when deleting health check, err: %v", err) } - hc, err2 := healthChecks.Get(80) - if err2 != nil { - t.Errorf("unexpected error") - } else { - if hc == nil { - t.Errorf("expected a *compute.HttpHealthCheck") - } + // Validate port is deleted + _, err = hcp.GetHealthCheck(hc.Name) + if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + t.Errorf("expected not-found error, actual: %v", err) } - err = healthChecks.Delete(8080) + // Delete only HTTP 1234 + err = healthChecks.Delete(1234) if err == nil { - t.Errorf("expected error") + t.Errorf("expected not-found error when deleting health check, err: %v", err) + } +} + +func TestHealthCheckUpdate(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // HTTP + // Manually insert a health check + hc := DefaultHealthCheck(3000, utils.ProtocolHTTP) + hc.Name = namer.BeName(3000) + hc.RequestPath = "/my-probes-health" + hcp.CreateHealthCheck(hc.ToComputeHealthCheck()) + + // Verify the health check exists + _, err := healthChecks.Get(3000) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + // Change to HTTPS + hc.Type = string(utils.ProtocolHTTPS) + _, err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected err while syncing healthcheck, err %v", err) + } + + // Verify the health check exists + _, err = healthChecks.Get(3000) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + // Verify the check is now HTTPS + if hc.Protocol() != utils.ProtocolHTTPS { + t.Fatalf("expected check to be of type HTTPS") } +} + +func TestHealthCheckDeleteLegacy(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) - err = healthChecks.Delete(80) + err := hcp.CreateHttpHealthCheck(&compute.HttpHealthCheck{ + Name: namer.BeName(80), + }) if err != nil { - t.Errorf("unexpected error") + t.Fatalf("expected health check to be created, err: %v", err) } - _, err3 := healthChecks.Get(80) - if err3 == nil { - t.Errorf("expected error") + err = healthChecks.DeleteLegacy(80) + if err != nil { + t.Fatalf("expected health check to be deleted, err: %v", err) } + } diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index 59d2596964..cdf8c635d6 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -18,27 +18,28 @@ package healthchecks import ( compute "google.golang.org/api/compute/v1" -) -// healthCheckGetter retrieves health checks. -type healthCheckGetter interface { - // HealthCheck returns the HTTP readiness check for a node port. - HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) -} + "k8s.io/ingress/controllers/gce/utils" +) -// SingleHealthCheck is an interface to manage a single GCE health check. -type SingleHealthCheck interface { +// HealthCheckProvider is an interface to manage a single GCE health check. +type HealthCheckProvider interface { CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error DeleteHttpHealthCheck(name string) error GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) + + CreateHealthCheck(hc *compute.HealthCheck) error + UpdateHealthCheck(hc *compute.HealthCheck) error + DeleteHealthCheck(name string) error + GetHealthCheck(name string) (*compute.HealthCheck, error) } // HealthChecker is an interface to manage cloud HTTPHealthChecks. type HealthChecker interface { - Init(h healthCheckGetter) - - Add(port int64) error + New(port int64, protocol utils.AppProtocol) *HealthCheck + Sync(hc *HealthCheck) (string, error) Delete(port int64) error - Get(port int64) (*compute.HttpHealthCheck, error) + Get(port int64) (*HealthCheck, error) + DeleteLegacy(port int64) error } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index dd5fd1574b..9972ccc013 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -70,7 +70,7 @@ type L7s struct { // TODO: Remove this field and always ask the BackendPool using the NodePort. glbcDefaultBackend *compute.BackendService defaultBackendPool backends.BackendPool - defaultBackendNodePort int64 + defaultBackendNodePort backends.ServicePort namer *utils.Namer } @@ -84,7 +84,7 @@ type L7s struct { func NewLoadBalancerPool( cloud LoadBalancers, defaultBackendPool backends.BackendPool, - defaultBackendNodePort int64, namer *utils.Namer) LoadBalancerPool { + defaultBackendNodePort backends.ServicePort, namer *utils.Namer) LoadBalancerPool { return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer} } @@ -172,7 +172,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { return err } - defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort) + defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) if err != nil { return err } @@ -209,7 +209,7 @@ func (l *L7s) GC(names []string) error { // This needs to happen after we've deleted all url-maps that might be // using it. if len(names) == 0 { - if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort); err != nil { + if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort.Port); err != nil { return err } l.glbcDefaultBackend = nil diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index a2f54d9a17..7d7c6925d2 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -30,17 +30,19 @@ import ( ) const ( - testDefaultBeNodePort = int64(3000) - defaultZone = "zone-a" + defaultZone = "zone-a" +) + +var ( + testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP} ) func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - fakeHCs := healthchecks.NewFakeHealthChecks() + fakeHCP := healthchecks.NewFakeHealthCheckProvider() namer := &utils.Namer{} - healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) - healthChecker.Init(&healthchecks.FakeHealthCheckGetter{}) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) backendPool := backends.NewBackendPool( diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 4910983959..f34c529d40 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/controller" "k8s.io/ingress/controllers/gce/loadbalancers" "k8s.io/ingress/controllers/gce/storage" @@ -226,11 +227,13 @@ func main() { glog.Fatalf("Default backend should take the form namespace/name: %v", *defaultSvc) } - defaultBackendNodePort, err := getNodePort(kubeClient, parts[0], parts[1]) + nodePort, err := getNodePort(kubeClient, parts[0], parts[1]) if err != nil { glog.Fatalf("Could not configure default backend %v: %v", *defaultSvc, err) } + // The default backend is known to be HTTP + defaultBackendNodePort := backends.ServicePort{Port: nodePort, Protocol: utils.ProtocolHTTP} if *inCluster || *useRealCloud { // Create cluster manager diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index f59d7b259d..1db1541e8e 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -80,16 +80,14 @@ const ( // debug information in the Ingress annotations. K8sAnnotationPrefix = "ingress.kubernetes.io" - // DefaultHealthCheckInterval defines how frequently a probe runs - DefaultHealthCheckInterval = 60 - // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" - DefaultHealthyThreshold = 1 - // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" - DefaultUnhealthyThreshold = 10 - // DefaultTimeoutSeconds defines the timeout of each probe - DefaultTimeoutSeconds = 60 + // ProtocolHTTP protocol for a service + ProtocolHTTP AppProtocol = "HTTP" + // ProtocolHTTPS protocol for a service + ProtocolHTTPS AppProtocol = "HTTPS" ) +type AppProtocol string + // Namer handles centralized naming for the cluster. type Namer struct { clusterName string @@ -333,22 +331,3 @@ func CompareLinks(l1, l2 string) bool { // FakeIngressRuleValueMap is a convenience type used by multiple submodules // that share the same testing methods. type FakeIngressRuleValueMap map[string]string - -// DefaultHealthCheckTemplate simply returns the default health check template. -func DefaultHealthCheckTemplate(port int64) *compute.HttpHealthCheck { - return &compute.HttpHealthCheck{ - Port: port, - // Empty string is used as a signal to the caller to use the appropriate - // default. - RequestPath: "", - Description: "Default kubernetes L7 Loadbalancing health check.", - // How often to health check. - CheckIntervalSec: DefaultHealthCheckInterval, - // How long to wait before claiming failure of a health check. - TimeoutSec: DefaultTimeoutSeconds, - // Number of healthchecks to pass for a vm to be deemed healthy. - HealthyThreshold: DefaultHealthyThreshold, - // Number of healthchecks to fail before the vm is deemed unhealthy. - UnhealthyThreshold: DefaultUnhealthyThreshold, - } -} diff --git a/docs/faq/gce.md b/docs/faq/gce.md index 392943c705..cf585ae464 100644 --- a/docs/faq/gce.md +++ b/docs/faq/gce.md @@ -42,7 +42,7 @@ Please check the following: 1. Output of `kubectl describe`, as shown [here](README.md#i-created-an-ingress-and-nothing-happens-what-now) 2. Do your Services all have a `NodePort`? -3. Do your Services either serve a http 200 on `/`, or have a readiness probe +3. Do your Services either serve an HTTP status code 200 on `/`, or have a readiness probe as described in [this section](#can-i-configure-gce-health-checks-through-the-ingress)? 4. Do you have enough GCP quota? @@ -68,8 +68,7 @@ Global Forwarding Rule -> TargetHTTPSProxy ``` In addition to this pipeline: -* Each Backend Service requires a HTTP health check to the NodePort of the - Service +* Each Backend Service requires a HTTP or HTTPS health check to the NodePort of the Service * Each port on the Backend Service has a matching port on the Instance Group * Each port on the Backend Service is exposed through a firewall-rule open to the GCE LB IP ranges (`130.211.0.0/22` and `35.191.0.0/16`) @@ -126,12 +125,12 @@ Please check the following: Currently health checks are not exposed through the Ingress resource, they're handled at the node level by Kubernetes daemons (kube-proxy and the kubelet). -However the GCE HTTP lb still requires a HTTP health check to measure node +However the GCE L7 lb still requires a HTTP(S) health check to measure node health. By default, this health check points at `/` on the nodePort associated with a given backend. Note that the purpose of this health check is NOT to determine when endpoint pods are overloaded, but rather, to detect when a given node is incapable of proxying requests for the Service:nodePort -alltogether. Overloaded endpoints are removed from the working set of a +altogether. Overloaded endpoints are removed from the working set of a Service via readiness probes conducted by the kubelet. If `/` doesn't work for your application, you can have the Ingress controller @@ -311,12 +310,12 @@ pointing to that Service's NodePort. Instance Group, these must be shared. There is 1 Ingress Instance Group per zone containing Kubernetes nodes. -* HTTP Health Checks: currently the http health checks point at the NodePort +* Health Checks: currently the health checks point at the NodePort of a BackendService. They don't *need* to be shared, but they are since BackendServices are shared. * Firewall rule: In a non-federated cluster there is a single firewall rule -that covers HTTP health check traffic from the range of [GCE loadbalancer IPs](https://cloud.google.com/compute/docs/load-balancing/http/#troubleshooting) +that covers health check traffic from the range of [GCE loadbalancer IPs](https://cloud.google.com/compute/docs/load-balancing/http/#troubleshooting) to Service nodePorts. Unique: diff --git a/examples/health-checks/gce/README.md b/examples/health-checks/gce/README.md index 25d2049dc9..a2d6e710a5 100644 --- a/examples/health-checks/gce/README.md +++ b/examples/health-checks/gce/README.md @@ -67,8 +67,6 @@ You can confirm the health check endpoint point it's using one of 2 ways: A few points to note: * The readiness probe must be exposed on the port matching the `servicePort` specified in the Ingress -* The readiness probe cannot have special requirements, like headers or HTTPS +* The readiness probe cannot have special requirements like headers * The probe timeouts are translated to GCE health check timeouts * You must create the pods backing the endpoints with the given readiness probe. This *will not* work if you update the replication controller with a different readiness probe. - -