Skip to content

Commit

Permalink
Support backside re-encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Apr 7, 2017
1 parent 7c635a8 commit 5f53f0b
Show file tree
Hide file tree
Showing 16 changed files with 822 additions and 377 deletions.
186 changes: 141 additions & 45 deletions controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
api_v1 "k8s.io/client-go/pkg/api/v1"

"k8s.io/ingress/controllers/gce/healthchecks"
"k8s.io/ingress/controllers/gce/instances"
Expand Down Expand Up @@ -75,6 +76,7 @@ type Backends struct {
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
prober probeProvider
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
Expand All @@ -86,6 +88,12 @@ func portKey(port int64) string {
return fmt.Sprintf("%d", port)
}

// ServicePort for tupling port and encryption bool
type ServicePort struct {
Port int64
Encrypted bool
}

// NewBackendPool returns a new backend pool.
// - cloud: implements BackendServices and syncs backends with a cloud provider
// - healthChecker: is capable of producing health checks for backends.
Expand Down Expand Up @@ -134,6 +142,10 @@ func NewBackendPool(
return backendPool
}

func (b *Backends) Init(pp probeProvider) {
b.prober = pp
}

// Get returns a single backend.
func (b *Backends) Get(port int64) (*compute.BackendService, error) {
be, err := b.cloud.GetBackendService(b.namer.BeName(port))
Expand All @@ -144,44 +156,46 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
return be, nil
}

func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
// Create a new health check
if err := b.healthChecker.Add(namedPort.Port); err != nil {
return nil, err
func (b *Backends) ensureHealthCheck(port int64, encrypted bool) (string, error) {
hc := b.healthChecker.New(port, encrypted)
if b.prober != nil {
probe, err := b.prober.GetProbe(port)
if err != nil {
return "", err
}
if probe != nil {
applyProbeSettingsToHC(probe, hc)
}
}

if err := b.healthChecker.Sync(hc); err != nil {
return "", err
}
hc, err := b.healthChecker.Get(namedPort.Port)
//TODO: Avoid this second call
hc, err := b.healthChecker.Get(port, encrypted)
if err != nil {
return "", err
}
return hc.SelfLink, nil
}

func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, encrypted bool, name string) (*compute.BackendService, error) {
hclink, err := b.ensureHealthCheck(namedPort.Port, encrypted)
if err != nil {
return nil, err
}
errs := []string{}

var errs []string
// We first try to create the backend with balancingMode=RATE. If this
// fails, it's mostly likely because there are existing backends with
// balancingMode=UTILIZATION. This failure mode throws a googleapi error
// balancingMode=UTILIZATION. This failure mode throws a googleapi_v1 error
// which wraps a HTTP 400 status code. We handle it in the loop below
// and come around to retry with the right balancing mode. The goal is to
// switch everyone to using RATE.
for _, bm := range []BalancingMode{Rate, Utilization} {
backends := getBackendsForIGs(igs)
for _, b := range backends {
switch bm {
case Rate:
b.MaxRate = maxRPS
default:
// TODO: Set utilization and connection limits when we accept them
// as valid fields.
}
b.BalancingMode = string(bm)
}
// Create a new backend
backend := &compute.BackendService{
Name: name,
Protocol: "HTTP",
Backends: backends,
HealthChecks: []string{hc.SelfLink},
Port: namedPort.Port,
PortName: namedPort.Name,
}
if err := b.cloud.CreateBackendService(backend); err != nil {
bs := newBackendService(igs, bm, namedPort, []string{hclink}, encrypted, name)
if err := b.cloud.CreateBackendService(bs); err != nil {
// This is probably a failure because we tried to create the backend
// with balancingMode=RATE when there are already backends with
// balancingMode=UTILIZATION. Just ignore it and retry setting
Expand All @@ -198,31 +212,84 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named
return nil, fmt.Errorf("%v", strings.Join(errs, "\n"))
}

func newBackendService(igs []*compute.InstanceGroup, bm BalancingMode, namedPort *compute.NamedPort, healthCheckLinks []string, encrypted bool, name string) *compute.BackendService {
backends := getBackendsForIGs(igs)
for _, b := range backends {
switch bm {
case Rate:
b.MaxRatePerInstance = maxRPS
default:
// TODO: Set utilization and connection limits when we accept them
// as valid fields.
}
b.BalancingMode = string(bm)
}

return &compute.BackendService{
Name: name,
Protocol: utils.GetHTTPScheme(encrypted),
Backends: backends,
HealthChecks: healthCheckLinks,
Port: namedPort.Port,
PortName: namedPort.Name,
}
}

func (b *Backends) updateProtocol(bs *compute.BackendService, encrypted bool) (*compute.BackendService, error) {
// Create healthcheck with proper protocol
hclink, err := b.ensureHealthCheck(bs.Port, encrypted)
if err != nil {
return nil, err
}

bs.Protocol = utils.GetHTTPScheme(encrypted)
bs.HealthChecks = []string{hclink}

if err = b.cloud.UpdateBackendService(bs); err != nil {
return bs, err
}

// Attempt delete of previous healthcheck; warn that err occurred
if err = b.healthChecker.Delete(bs.Port, !encrypted); err != nil {
glog.Warningf("Failed to delete %v healthcheck for port %v, err: %v", utils.GetHTTPScheme(!encrypted), bs.Port, err)
}

return bs, nil
}

// Add will get or create a Backend for the given port.
func (b *Backends) Add(port int64) error {
func (b *Backends) Add(p ServicePort) error {
// We must track the port even if creating the backend failed, because
// we might've created a health-check for it.
be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(port), be) }()
defer func() { b.snapshotter.Add(portKey(p.Port), be) }()

igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port)
if err != nil {
return err
}
be, _ = b.Get(port)

pName := b.namer.BeName(p.Port)
be, _ = b.Get(p.Port)
if be == nil {
glog.Infof("Creating backend for %d instance groups, port %v named port %v",
len(igs), port, namedPort)
be, err = b.create(igs, namedPort, b.namer.BeName(port))
glog.Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort)
be, err = b.create(igs, namedPort, p.Encrypted, pName)
if err != nil {
return err
}
}

pProto := utils.GetHTTPScheme(p.Encrypted)
if be.Protocol != pProto {
glog.Infof("Updating backend protocol %v from %v to %v", pName, be.Protocol, pProto)
b.updateProtocol(be, p.Encrypted)
}

// we won't find any igs till the node pool syncs nodes.
if len(igs) == 0 {
return nil
}
if err := b.edgeHop(be, igs); err != nil {
if err = b.edgeHop(be, igs); err != nil {
return err
}
return err
Expand All @@ -231,7 +298,7 @@ func (b *Backends) Add(port int64) error {
// Delete deletes the Backend for the given port.
func (b *Backends) Delete(port int64) (err error) {
name := b.namer.BeName(port)
glog.Infof("Deleting backend %v", name)
glog.Infof("Deleting backend service %v", name)
defer func() {
if utils.IsHTTPErrorCode(err, http.StatusNotFound) {
err = nil
Expand All @@ -245,10 +312,17 @@ func (b *Backends) Delete(port int64) (err error) {
!utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return err
}
if err = b.healthChecker.Delete(port); err != nil &&
!utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return err

// Delete HTTP and HTTPS health checks in case both exist
// TODO: Update GLBC to create the newer compute.HealthCheck so we aren't straddling
// two types of health checks (legacy http & legacy https)
encryption := []bool{false, true}
for _, e := range encryption {
if err = b.healthChecker.Delete(port, e); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -306,7 +380,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
}

// Sync syncs backend services corresponding to ports in the given list.
func (b *Backends) Sync(svcNodePorts []int64) error {
func (b *Backends) Sync(svcNodePorts []ServicePort) error {
glog.V(3).Infof("Sync: backends %v", svcNodePorts)

// create backends for new ports, perform an edge hop for existing ports
Expand All @@ -319,14 +393,14 @@ func (b *Backends) Sync(svcNodePorts []int64) error {
}

// GC garbage collects services corresponding to ports in the given list.
func (b *Backends) GC(svcNodePorts []int64) error {
func (b *Backends) GC(svcNodePorts []ServicePort) error {
knownPorts := sets.NewString()
for _, port := range svcNodePorts {
knownPorts.Insert(portKey(port))
for _, p := range svcNodePorts {
knownPorts.Insert(portKey(p.Port))
}
pool := b.snapshotter.Snapshot()
for port := range pool {
p, err := strconv.Atoi(port)
p, err := strconv.ParseInt(port, 10, 64)
if err != nil {
return err
}
Expand All @@ -345,7 +419,7 @@ func (b *Backends) GC(svcNodePorts []int64) error {
// Shutdown deletes all backends and the default backend.
// This will fail if one of the backends is being used by another resource.
func (b *Backends) Shutdown() error {
if err := b.GC([]int64{}); err != nil {
if err := b.GC([]ServicePort{}); err != nil {
return err
}
return nil
Expand All @@ -365,3 +439,25 @@ func (b *Backends) Status(name string) string {
// TODO: State transition are important, not just the latest.
return hs.HealthStatus[0].HealthState
}

func applyProbeSettingsToHC(p *api_v1.Probe, hc *healthchecks.HealthCheck) {
healthPath := p.Handler.HTTPGet.Path
// GCE requires a leading "/" for health check urls.
if !strings.HasPrefix(healthPath, "/") {
healthPath = fmt.Sprintf("/%v", healthPath)
}

host := p.Handler.HTTPGet.Host
// remember the ingresses that use this Service so we can send
// the right events

hc.RequestPath = healthPath
hc.Host = host
hc.Description = "kubernetes L7 health check from readiness probe."
// set a low health threshold and a high failure threshold.
// We're just trying to detect if the node networking is
// borked, service level outages will get detected sooner
// by kube-proxy.
hc.CheckIntervalSec = int64(p.PeriodSeconds + healthchecks.DefaultHealthCheckInterval)
hc.TimeoutSec = int64(p.TimeoutSeconds)
}
Loading

0 comments on commit 5f53f0b

Please sign in to comment.