Skip to content

Commit

Permalink
feat: add improved handling for static gateway addresses
Browse files Browse the repository at this point in the history
Signed-off-by: Shane Utt <[email protected]>
  • Loading branch information
shaneutt committed Oct 10, 2023
1 parent a3e7df5 commit 546f507
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 157 deletions.
78 changes: 43 additions & 35 deletions controllers/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/kong/blixt/pkg/vars"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -84,63 +87,49 @@ func (r *GatewayReconciler) gatewayHasMatchingGatewayClass(obj client.Object) bo
func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

gw := new(gatewayv1beta1.Gateway)
if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil {
gateway := new(gatewayv1beta1.Gateway)
if err := r.Client.Get(ctx, req.NamespacedName, gateway); err != nil {
if errors.IsNotFound(err) {
log.Info("object enqueued no longer exists, skipping")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

gwc := new(gatewayv1beta1.GatewayClass)
if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gw.Spec.GatewayClassName)}, gwc); err != nil {
gatewayClass := new(gatewayv1beta1.GatewayClass)
if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

if gwc.Spec.ControllerName != vars.GatewayClassControllerName {
if gatewayClass.Spec.ControllerName != vars.GatewayClassControllerName {
return ctrl.Result{}, nil
}

// determine if the Gateway has been accepted, and if it has not then
// determine whether we will accept it, and if not drop it until it has
// been corrected.
oldGateway := gw.DeepCopy()
isAccepted := isGatewayAccepted(gw)
if !isAccepted {
initGatewayStatus(gw)
setGatewayAcceptance(gw)
factorizeStatus(gw, oldGateway)

oldAccepted := getAcceptedConditionForGateway(oldGateway)
accepted := getAcceptedConditionForGateway(gw)

if !sameConditions(oldAccepted, accepted) {
return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway))
}

log.Info("gateway %s/%s is not accepted and will not be provisioned", gw.Namespace, gw.Name)
return ctrl.Result{}, nil
log.Info("found a supported Gateway, determining whether the gateway has been accepted")
oldGateway := gateway.DeepCopy()
if !isGatewayAccepted(gateway) {
log.Info("gateway not yet accepted")
setGatewayListenerStatus(gateway)
setGatewayStatus(gateway)
updateConditionGeneration(gateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}

log.Info("checking for Service for Gateway")
svc, err := r.getServiceForGateway(ctx, gw)
svc, err := r.getServiceForGateway(ctx, gateway)
if err != nil {
return ctrl.Result{}, err
}
if svc == nil {
log.Info("creating Service for Gateway")
return ctrl.Result{}, r.createServiceForGateway(ctx, gw) // service creation will requeue gateway
return ctrl.Result{}, r.createServiceForGateway(ctx, gateway) // service creation will requeue gateway
}

log.Info("checking Service configuration")
needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gw)
// in both cases when the service does not exist or an error has been triggered, the Gateway
// must be not ready. This OR condition is redundant, as (needsUpdate == true AND err == nil)
// should never happen, but useful to highlight the purpose.
needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gateway)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -151,9 +140,27 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Info("checking Service status", "namespace", svc.Namespace, "name", svc.Name)
switch t := svc.Spec.Type; t {
case corev1.ServiceTypeLoadBalancer:
if err := r.svcIsHealthy(ctx, svc); err != nil {
// TODO: only handles metallb right now https://github.com/Kong/blixt/issues/96
if strings.Contains(err.Error(), "Failed to allocate IP") {
r.Log.Info("failed to allocate IP for Gateway", gateway.Namespace, gateway.Name)
setCond(gateway, metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
ObservedGeneration: gateway.Generation,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: string(gatewayv1beta1.GatewayReasonAddressNotUsable),
Message: err.Error(),
})
updateConditionGeneration(gateway)
return ctrl.Result{Requeue: true}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}
return ctrl.Result{}, err
}

if svc.Spec.ClusterIP == "" || len(svc.Status.LoadBalancer.Ingress) < 1 {
log.Info("waiting for Service to be ready")
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{RequeueAfter: time.Second}, nil
}
default:
return ctrl.Result{}, fmt.Errorf("found unsupported Service type: %s (only LoadBalancer type is currently supported)", t)
Expand All @@ -170,8 +177,9 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, nil
}

log.Info("Service is ready, updating Gateway")
updateGatewayStatus(ctx, gw, svc)
factorizeStatus(gw, oldGateway)
return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway))
log.Info("Service is ready, setting Gateway as programmed")
setGatewayStatusAddresses(gateway, svc)
setGatewayListenerConditionsAndProgrammed(gateway)
updateConditionGeneration(gateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}
148 changes: 51 additions & 97 deletions controllers/gateway_controller_status.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package controllers

import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// updateGatewayStatus computes the new Gateway status, setting its ready condition and all the
// ready listeners's ready conditions to true, unless a resolvedRefs error is discovered. In
// that case, the proper listener ready condition and the gateway one are set to false.
// The addresses are updated as well.
func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc *corev1.Service) {
// gateway addresses
gwaddrs := make([]gatewayv1beta1.GatewayStatusAddress, 0, len(svc.Status.LoadBalancer.Ingress))
func setGatewayStatusAddresses(gateway *gatewayv1beta1.Gateway, svc *corev1.Service) {
gwaddrs := []gatewayv1beta1.GatewayStatusAddress{}
for _, addr := range svc.Status.LoadBalancer.Ingress {
if addr.IP != "" {
gwaddrs = append(gwaddrs, gatewayv1beta1.GatewayStatusAddress{
Expand All @@ -30,17 +23,10 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
}
}
gateway.Status.Addresses = gwaddrs
}

// gateway conditions
newGatewayAcceptedCondition := metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.GatewayReasonAccepted),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
Message: "blixt controlplane accepts responsibility for the Gateway",
}
newGatewayProgrammedCondition := metav1.Condition{
func setGatewayListenerConditionsAndProgrammed(gateway *gatewayv1beta1.Gateway) {
programmed := metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.GatewayReasonProgrammed),
Expand All @@ -49,7 +35,6 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
Message: "the gateway is ready to route traffic",
}

// gateway listeners conditions
listenersStatus := make([]gatewayv1beta1.ListenerStatus, 0, len(gateway.Spec.Listeners))
for _, l := range gateway.Spec.Listeners {
supportedKinds, resolvedRefsCondition := getSupportedKinds(gateway.Generation, l)
Expand All @@ -63,6 +48,13 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
Name: l.Name,
SupportedKinds: supportedKinds,
Conditions: []metav1.Condition{
{
Type: string(gatewayv1beta1.ListenerConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.ListenerReasonAccepted),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
},
{
Type: string(gatewayv1beta1.ListenerConditionProgrammed),
Status: metav1.ConditionStatus(listenerProgrammedStatus),
Expand All @@ -74,42 +66,16 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
},
})
if resolvedRefsCondition.Status == metav1.ConditionFalse {
newGatewayProgrammedCondition.Status = metav1.ConditionFalse
newGatewayProgrammedCondition.Reason = string(gatewayv1beta1.GatewayReasonAddressNotAssigned)
newGatewayProgrammedCondition.Message = "the gateway is not ready to route traffic"
programmed.Status = metav1.ConditionFalse
programmed.Reason = string(gatewayv1beta1.GatewayReasonAddressNotAssigned)
programmed.Message = "the gateway is not ready to route traffic"
}
}

gateway.Status.Conditions = []metav1.Condition{
newGatewayAcceptedCondition,
newGatewayProgrammedCondition,
}
gateway.Status.Listeners = listenersStatus
setCond(gateway, programmed)
}

// initGatewayStatus initializes the GatewayStatus, setting the ready condition to
// not ready and all the listeners ready status to not ready as well.
func initGatewayStatus(gateway *gatewayv1beta1.Gateway) {
gateway.Status = gatewayv1beta1.GatewayStatus{
Conditions: []metav1.Condition{
{
Type: string(gatewayv1beta1.GatewayConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.GatewayReasonAccepted),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
Message: "blixt controlplane accepts responsibility for the Gateway",
},
{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
Reason: string(gatewayv1beta1.GatewayReasonAddressNotAssigned),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
Message: "the gateway is not ready to route traffic",
},
},
}
func setGatewayListenerStatus(gateway *gatewayv1beta1.Gateway) {
gateway.Status.Listeners = make([]gatewayv1beta1.ListenerStatus, 0, len(gateway.Spec.Listeners))
for _, l := range gateway.Spec.Listeners {
supportedKinds, resolvedRefsCondition := getSupportedKinds(gateway.Generation, l)
Expand Down Expand Up @@ -160,12 +126,12 @@ func getSupportedKinds(generation int64, listener gatewayv1beta1.Listener) (supp
case gatewayv1beta1.HTTPProtocolType:
supportedKinds = append(supportedKinds, gatewayv1beta1.RouteGroupKind{
Group: (*gatewayv1beta1.Group)(&gatewayv1beta1.GroupVersion.Group),
Kind: "TCPRoute",
Kind: "HTTPRoute",
})
case gatewayv1beta1.HTTPSProtocolType:
supportedKinds = append(supportedKinds, gatewayv1beta1.RouteGroupKind{
Group: (*gatewayv1beta1.Group)(&gatewayv1beta1.GroupVersion.Group),
Kind: "TCPRoute",
Kind: "HTTPRoute",
})
default:
resolvedRefsCondition.Status = metav1.ConditionFalse
Expand All @@ -188,38 +154,20 @@ func getSupportedKinds(generation int64, listener gatewayv1beta1.Listener) (supp
return supportedKinds, resolvedRefsCondition
}

// factorizeStatus takes the old gateway conditions not transitioned and copies them
// updateConditionGeneration takes the old gateway conditions not transitioned and copies them
// into the new gateway status, so that only the transitioning conditions gets actually patched.
func factorizeStatus(gateway, oldGateway *gatewayv1beta1.Gateway) {
for i, c := range gateway.Status.Conditions {
for _, oldC := range oldGateway.Status.Conditions {
if c.Type == oldC.Type {
if c.Status == oldC.Status && c.Reason == oldC.Reason {
gateway.Status.Conditions[i] = oldC
}
}
}
}

func updateConditionGeneration(gateway *gatewayv1beta1.Gateway) {
for i := 0; i < len(gateway.Status.Conditions); i++ {
gateway.Status.Conditions[0].ObservedGeneration = gateway.Generation
}

for i, l := range gateway.Status.Listeners {
for j, lc := range l.Conditions {
for _, ol := range oldGateway.Status.Listeners {
if ol.Name != l.Name {
continue
}
for _, olc := range ol.Conditions {
if lc.Type == olc.Type {
if lc.Status == olc.Status && lc.Reason == olc.Reason {
gateway.Status.Listeners[i].Conditions[j] = olc
}
}
}
}
for i := 0; i < len(gateway.Status.Listeners); i++ {
updatedListenerConditions := []metav1.Condition{}
for _, cond := range gateway.Status.Listeners[0].Conditions {
cond.ObservedGeneration = gateway.Generation
updatedListenerConditions = append(updatedListenerConditions, cond)
}
gateway.Status.Listeners[0].Conditions = updatedListenerConditions
}
}

Expand All @@ -232,28 +180,34 @@ func isGatewayAccepted(gateway *gatewayv1beta1.Gateway) bool {
}

func getAcceptedConditionForGateway(gateway *gatewayv1beta1.Gateway) *metav1.Condition {
for _, c := range gateway.Status.Conditions {
if c.Type == string(gatewayv1beta1.GatewayConditionAccepted) {
return &c
}
}
return nil
return getCond(gateway, string(gatewayv1beta1.GatewayConditionAccepted))
}

// isGatewayProgrammed returns two boolean values:
// - the status of the programmed condition
// - a boolean flag to check if the condition exists
func isGatewayProgrammed(gateway *gatewayv1beta1.Gateway) (status bool, isSet bool) {
for _, c := range gateway.Status.Conditions {
if c.Type == string(gatewayv1beta1.GatewayConditionProgrammed) {
return c.Status == metav1.ConditionTrue, true
func setCond(gateway *gatewayv1beta1.Gateway, setCond metav1.Condition) {
updatedConditions := make([]metav1.Condition, 0, len(gateway.Status.Conditions))

found := false
for _, oldCond := range gateway.Status.Conditions {
if oldCond.Type == setCond.Type {
found = true
updatedConditions = append(updatedConditions, setCond)
} else {
updatedConditions = append(updatedConditions, oldCond)
}
}
return false, false

if !found {
updatedConditions = append(updatedConditions, setCond)
}

gateway.Status.Conditions = updatedConditions
}

// sameConditions returns true if the type, status and reason match for
// the two provided metav1.Conditions.
func sameConditions(cond1, cond2 *metav1.Condition) bool {
return cond1.Type == cond2.Type && cond1.Status == cond2.Status && cond1.Reason == cond2.Reason && cond1.Message == cond2.Message && cond1.ObservedGeneration == cond2.ObservedGeneration
func getCond(gateway *gatewayv1beta1.Gateway, requestedType string) *metav1.Condition {
for _, cond := range gateway.Status.Conditions {
if cond.Type == requestedType {
return &cond
}
}
return nil
}
Loading

0 comments on commit 546f507

Please sign in to comment.