From 0e40a7fd019920a07c973000632006cc96931ef0 Mon Sep 17 00:00:00 2001 From: Shane Utt Date: Tue, 10 Oct 2023 16:07:43 -0400 Subject: [PATCH] feat: add improved handling for static gateway addresses Signed-off-by: Shane Utt --- controllers/gateway_controller.go | 78 ++++++------ controllers/gateway_controller_status.go | 148 ++++++++--------------- controllers/gateway_controller_utils.go | 123 +++++++++++++++---- 3 files changed, 192 insertions(+), 157 deletions(-) diff --git a/controllers/gateway_controller.go b/controllers/gateway_controller.go index 59c26d8c..1077e6c6 100644 --- a/controllers/gateway_controller.go +++ b/controllers/gateway_controller.go @@ -4,11 +4,14 @@ import ( "context" "fmt" "reflect" + "strings" + "time" "github.com/go-logr/logr" "github.com/kong/blixt/pkg/vars" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -84,8 +87,8 @@ func (r *GatewayReconciler) gatewayHasMatchingGatewayClass(obj client.Object) bo func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := log.FromContext(ctx) - gw := new(gatewayv1beta1.Gateway) - if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil { + gateway := new(gatewayv1beta1.Gateway) + if err := r.Client.Get(ctx, req.NamespacedName, gateway); err != nil { if errors.IsNotFound(err) { log.Info("object enqueued no longer exists, skipping") return ctrl.Result{}, nil @@ -93,54 +96,40 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } - gwc := new(gatewayv1beta1.GatewayClass) - if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gw.Spec.GatewayClassName)}, gwc); err != nil { + gatewayClass := new(gatewayv1beta1.GatewayClass) + if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { if errors.IsNotFound(err) { return ctrl.Result{}, nil } return ctrl.Result{}, err } - if gwc.Spec.ControllerName != vars.GatewayClassControllerName { + if gatewayClass.Spec.ControllerName != vars.GatewayClassControllerName { return ctrl.Result{}, nil } - // determine if the Gateway has been accepted, and if it has not then - // determine whether we will accept it, and if not drop it until it has - // been corrected. - oldGateway := gw.DeepCopy() - isAccepted := isGatewayAccepted(gw) - if !isAccepted { - initGatewayStatus(gw) - setGatewayAcceptance(gw) - factorizeStatus(gw, oldGateway) - - oldAccepted := getAcceptedConditionForGateway(oldGateway) - accepted := getAcceptedConditionForGateway(gw) - - if !sameConditions(oldAccepted, accepted) { - return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway)) - } - - log.Info("gateway %s/%s is not accepted and will not be provisioned", gw.Namespace, gw.Name) - return ctrl.Result{}, nil + log.Info("found a supported Gateway, determining whether the gateway has been accepted") + oldGateway := gateway.DeepCopy() + if !isGatewayAccepted(gateway) { + log.Info("gateway not yet accepted") + setGatewayListenerStatus(gateway) + setGatewayStatus(gateway) + updateConditionGeneration(gateway) + return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway)) } log.Info("checking for Service for Gateway") - svc, err := r.getServiceForGateway(ctx, gw) + svc, err := r.getServiceForGateway(ctx, gateway) if err != nil { return ctrl.Result{}, err } if svc == nil { log.Info("creating Service for Gateway") - return ctrl.Result{}, r.createServiceForGateway(ctx, gw) // service creation will requeue gateway + return ctrl.Result{}, r.createServiceForGateway(ctx, gateway) // service creation will requeue gateway } log.Info("checking Service configuration") - needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gw) - // in both cases when the service does not exist or an error has been triggered, the Gateway - // must be not ready. This OR condition is redundant, as (needsUpdate == true AND err == nil) - // should never happen, but useful to highlight the purpose. + needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gateway) if err != nil { return ctrl.Result{}, err } @@ -151,9 +140,27 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct log.Info("checking Service status", "namespace", svc.Namespace, "name", svc.Name) switch t := svc.Spec.Type; t { case corev1.ServiceTypeLoadBalancer: + if err := r.svcIsHealthy(ctx, svc); err != nil { + // TODO: only handles metallb right now https://github.com/Kong/blixt/issues/96 + if strings.Contains(err.Error(), "Failed to allocate IP") { + r.Log.Info("failed to allocate IP for Gateway", gateway.Namespace, gateway.Name) + setCond(gateway, metav1.Condition{ + Type: string(gatewayv1beta1.GatewayConditionProgrammed), + ObservedGeneration: gateway.Generation, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.Now(), + Reason: string(gatewayv1beta1.GatewayReasonAddressNotUsable), + Message: err.Error(), + }) + updateConditionGeneration(gateway) + return ctrl.Result{Requeue: true}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway)) + } + return ctrl.Result{}, err + } + if svc.Spec.ClusterIP == "" || len(svc.Status.LoadBalancer.Ingress) < 1 { log.Info("waiting for Service to be ready") - return ctrl.Result{Requeue: true}, nil + return ctrl.Result{RequeueAfter: time.Second}, nil } default: return ctrl.Result{}, fmt.Errorf("found unsupported Service type: %s (only LoadBalancer type is currently supported)", t) @@ -170,8 +177,9 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{Requeue: true}, nil } - log.Info("Service is ready, updating Gateway") - updateGatewayStatus(ctx, gw, svc) - factorizeStatus(gw, oldGateway) - return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway)) + log.Info("Service is ready, setting Gateway as programmed") + setGatewayStatusAddresses(gateway, svc) + setGatewayListenerConditionsAndProgrammed(gateway) + updateConditionGeneration(gateway) + return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway)) } diff --git a/controllers/gateway_controller_status.go b/controllers/gateway_controller_status.go index d5af3c6c..d9fdece1 100644 --- a/controllers/gateway_controller_status.go +++ b/controllers/gateway_controller_status.go @@ -1,20 +1,13 @@ package controllers import ( - "context" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) -// updateGatewayStatus computes the new Gateway status, setting its ready condition and all the -// ready listeners's ready conditions to true, unless a resolvedRefs error is discovered. In -// that case, the proper listener ready condition and the gateway one are set to false. -// The addresses are updated as well. -func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc *corev1.Service) { - // gateway addresses - gwaddrs := make([]gatewayv1beta1.GatewayStatusAddress, 0, len(svc.Status.LoadBalancer.Ingress)) +func setGatewayStatusAddresses(gateway *gatewayv1beta1.Gateway, svc *corev1.Service) { + gwaddrs := []gatewayv1beta1.GatewayStatusAddress{} for _, addr := range svc.Status.LoadBalancer.Ingress { if addr.IP != "" { gwaddrs = append(gwaddrs, gatewayv1beta1.GatewayStatusAddress{ @@ -30,17 +23,10 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc } } gateway.Status.Addresses = gwaddrs +} - // gateway conditions - newGatewayAcceptedCondition := metav1.Condition{ - Type: string(gatewayv1beta1.GatewayConditionAccepted), - Status: metav1.ConditionTrue, - Reason: string(gatewayv1beta1.GatewayReasonAccepted), - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Message: "blixt controlplane accepts responsibility for the Gateway", - } - newGatewayProgrammedCondition := metav1.Condition{ +func setGatewayListenerConditionsAndProgrammed(gateway *gatewayv1beta1.Gateway) { + programmed := metav1.Condition{ Type: string(gatewayv1beta1.GatewayConditionProgrammed), Status: metav1.ConditionTrue, Reason: string(gatewayv1beta1.GatewayReasonProgrammed), @@ -49,7 +35,6 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc Message: "the gateway is ready to route traffic", } - // gateway listeners conditions listenersStatus := make([]gatewayv1beta1.ListenerStatus, 0, len(gateway.Spec.Listeners)) for _, l := range gateway.Spec.Listeners { supportedKinds, resolvedRefsCondition := getSupportedKinds(gateway.Generation, l) @@ -63,6 +48,13 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc Name: l.Name, SupportedKinds: supportedKinds, Conditions: []metav1.Condition{ + { + Type: string(gatewayv1beta1.ListenerConditionAccepted), + Status: metav1.ConditionTrue, + Reason: string(gatewayv1beta1.ListenerReasonAccepted), + ObservedGeneration: gateway.Generation, + LastTransitionTime: metav1.Now(), + }, { Type: string(gatewayv1beta1.ListenerConditionProgrammed), Status: metav1.ConditionStatus(listenerProgrammedStatus), @@ -74,42 +66,16 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc }, }) if resolvedRefsCondition.Status == metav1.ConditionFalse { - newGatewayProgrammedCondition.Status = metav1.ConditionFalse - newGatewayProgrammedCondition.Reason = string(gatewayv1beta1.GatewayReasonAddressNotAssigned) - newGatewayProgrammedCondition.Message = "the gateway is not ready to route traffic" + programmed.Status = metav1.ConditionFalse + programmed.Reason = string(gatewayv1beta1.GatewayReasonAddressNotAssigned) + programmed.Message = "the gateway is not ready to route traffic" } } - - gateway.Status.Conditions = []metav1.Condition{ - newGatewayAcceptedCondition, - newGatewayProgrammedCondition, - } gateway.Status.Listeners = listenersStatus + setCond(gateway, programmed) } -// initGatewayStatus initializes the GatewayStatus, setting the ready condition to -// not ready and all the listeners ready status to not ready as well. -func initGatewayStatus(gateway *gatewayv1beta1.Gateway) { - gateway.Status = gatewayv1beta1.GatewayStatus{ - Conditions: []metav1.Condition{ - { - Type: string(gatewayv1beta1.GatewayConditionAccepted), - Status: metav1.ConditionTrue, - Reason: string(gatewayv1beta1.GatewayReasonAccepted), - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Message: "blixt controlplane accepts responsibility for the Gateway", - }, - { - Type: string(gatewayv1beta1.GatewayConditionProgrammed), - Status: metav1.ConditionFalse, - Reason: string(gatewayv1beta1.GatewayReasonAddressNotAssigned), - ObservedGeneration: gateway.Generation, - LastTransitionTime: metav1.Now(), - Message: "the gateway is not ready to route traffic", - }, - }, - } +func setGatewayListenerStatus(gateway *gatewayv1beta1.Gateway) { gateway.Status.Listeners = make([]gatewayv1beta1.ListenerStatus, 0, len(gateway.Spec.Listeners)) for _, l := range gateway.Spec.Listeners { supportedKinds, resolvedRefsCondition := getSupportedKinds(gateway.Generation, l) @@ -160,12 +126,12 @@ func getSupportedKinds(generation int64, listener gatewayv1beta1.Listener) (supp case gatewayv1beta1.HTTPProtocolType: supportedKinds = append(supportedKinds, gatewayv1beta1.RouteGroupKind{ Group: (*gatewayv1beta1.Group)(&gatewayv1beta1.GroupVersion.Group), - Kind: "TCPRoute", + Kind: "HTTPRoute", }) case gatewayv1beta1.HTTPSProtocolType: supportedKinds = append(supportedKinds, gatewayv1beta1.RouteGroupKind{ Group: (*gatewayv1beta1.Group)(&gatewayv1beta1.GroupVersion.Group), - Kind: "TCPRoute", + Kind: "HTTPRoute", }) default: resolvedRefsCondition.Status = metav1.ConditionFalse @@ -188,38 +154,20 @@ func getSupportedKinds(generation int64, listener gatewayv1beta1.Listener) (supp return supportedKinds, resolvedRefsCondition } -// factorizeStatus takes the old gateway conditions not transitioned and copies them +// updateConditionGeneration takes the old gateway conditions not transitioned and copies them // into the new gateway status, so that only the transitioning conditions gets actually patched. -func factorizeStatus(gateway, oldGateway *gatewayv1beta1.Gateway) { - for i, c := range gateway.Status.Conditions { - for _, oldC := range oldGateway.Status.Conditions { - if c.Type == oldC.Type { - if c.Status == oldC.Status && c.Reason == oldC.Reason { - gateway.Status.Conditions[i] = oldC - } - } - } - } - +func updateConditionGeneration(gateway *gatewayv1beta1.Gateway) { for i := 0; i < len(gateway.Status.Conditions); i++ { gateway.Status.Conditions[0].ObservedGeneration = gateway.Generation } - for i, l := range gateway.Status.Listeners { - for j, lc := range l.Conditions { - for _, ol := range oldGateway.Status.Listeners { - if ol.Name != l.Name { - continue - } - for _, olc := range ol.Conditions { - if lc.Type == olc.Type { - if lc.Status == olc.Status && lc.Reason == olc.Reason { - gateway.Status.Listeners[i].Conditions[j] = olc - } - } - } - } + for i := 0; i < len(gateway.Status.Listeners); i++ { + updatedListenerConditions := []metav1.Condition{} + for _, cond := range gateway.Status.Listeners[0].Conditions { + cond.ObservedGeneration = gateway.Generation + updatedListenerConditions = append(updatedListenerConditions, cond) } + gateway.Status.Listeners[0].Conditions = updatedListenerConditions } } @@ -232,28 +180,34 @@ func isGatewayAccepted(gateway *gatewayv1beta1.Gateway) bool { } func getAcceptedConditionForGateway(gateway *gatewayv1beta1.Gateway) *metav1.Condition { - for _, c := range gateway.Status.Conditions { - if c.Type == string(gatewayv1beta1.GatewayConditionAccepted) { - return &c - } - } - return nil + return getCond(gateway, string(gatewayv1beta1.GatewayConditionAccepted)) } -// isGatewayProgrammed returns two boolean values: -// - the status of the programmed condition -// - a boolean flag to check if the condition exists -func isGatewayProgrammed(gateway *gatewayv1beta1.Gateway) (status bool, isSet bool) { - for _, c := range gateway.Status.Conditions { - if c.Type == string(gatewayv1beta1.GatewayConditionProgrammed) { - return c.Status == metav1.ConditionTrue, true +func setCond(gateway *gatewayv1beta1.Gateway, setCond metav1.Condition) { + updatedConditions := make([]metav1.Condition, 0, len(gateway.Status.Conditions)) + + found := false + for _, oldCond := range gateway.Status.Conditions { + if oldCond.Type == setCond.Type { + found = true + updatedConditions = append(updatedConditions, setCond) + } else { + updatedConditions = append(updatedConditions, oldCond) } } - return false, false + + if !found { + updatedConditions = append(updatedConditions, setCond) + } + + gateway.Status.Conditions = updatedConditions } -// sameConditions returns true if the type, status and reason match for -// the two provided metav1.Conditions. -func sameConditions(cond1, cond2 *metav1.Condition) bool { - return cond1.Type == cond2.Type && cond1.Status == cond2.Status && cond1.Reason == cond2.Reason && cond1.Message == cond2.Message && cond1.ObservedGeneration == cond2.ObservedGeneration +func getCond(gateway *gatewayv1beta1.Gateway, requestedType string) *metav1.Condition { + for _, cond := range gateway.Status.Conditions { + if cond.Type == requestedType { + return &cond + } + } + return nil } diff --git a/controllers/gateway_controller_utils.go b/controllers/gateway_controller_utils.go index ee142644..0a490013 100644 --- a/controllers/gateway_controller_utils.go +++ b/controllers/gateway_controller_utils.go @@ -83,16 +83,82 @@ func setOwnerReference(svc *corev1.Service, gw client.Object) { }} } -func (r *GatewayReconciler) ensureServiceConfiguration(_ context.Context, svc *corev1.Service, gw *gatewayv1beta1.Gateway) (bool, error) { - // TODO: handle removal and changes of addresses https://github.com/Kong/blixt/issues/96 +func (r *GatewayReconciler) svcIsHealthy(ctx context.Context, svc *corev1.Service) error { + if len(svc.Status.LoadBalancer.Ingress) > 0 { + return nil + } + + // FIXME: the following is a hack to use metallb events to determine if the + // service is having trouble getting an IP allocated for it. This was created + // in a hurry and needs to be replaced with something robust. + events := &corev1.EventList{} + if err := r.Client.List(ctx, events, &client.ListOptions{ + // TODO: add a field selector + Namespace: svc.Namespace, + }); err != nil { + return err + } + + var allocationFailed *corev1.Event + var allocationSucceeded *corev1.Event + + for _, event := range events.Items { + currentEvent := event - // check whether there's been a problem allocating an IP address - for _, cond := range svc.Status.Conditions { - if cond.Type == corev1.EventTypeWarning && cond.Reason == "AllocationFailed" { // TODO: only handles metallb right now https://github.com/Kong/blixt/issues/96 - return false, fmt.Errorf(cond.Message) + if currentEvent.InvolvedObject.Name == svc.Name && currentEvent.Reason == "AllocationFailed" { // TODO: only handles metallb right now https://github.com/Kong/blixt/issues/96 + if allocationFailed != nil { + if currentEvent.EventTime.After(allocationFailed.EventTime.Time) { + allocationFailed = ¤tEvent + } + } else { + allocationFailed = ¤tEvent + } + } + + if currentEvent.InvolvedObject.Name == svc.Name && currentEvent.Reason == "IPAllocated" { + if allocationSucceeded != nil { + if currentEvent.EventTime.After(allocationSucceeded.EventTime.Time) { + allocationSucceeded = ¤tEvent + } + } else { + allocationSucceeded = ¤tEvent + } } } + if allocationFailed != nil { + if allocationSucceeded != nil && allocationSucceeded.EventTime.After(allocationFailed.EventTime.Time) { + return nil + } + return fmt.Errorf(allocationFailed.Message) + } + + return nil +} + +func (r *GatewayReconciler) ensureServiceConfiguration(ctx context.Context, svc *corev1.Service, gw *gatewayv1beta1.Gateway) (bool, error) { + updated := false + + if len(gw.Spec.Addresses) > 0 && svc.Spec.LoadBalancerIP != gw.Spec.Addresses[0].Value { + if len(gw.Spec.Addresses) > 1 { + r.Log.Info(fmt.Sprintf("found %d addresses on gateway, but currently we only support 1", len(gw.Spec.Addresses)), gw.Namespace, gw.Name) + } + r.Log.Info(fmt.Sprintf("using address %s for gateway", gw.Spec.Addresses[0].Value), gw.Namespace, gw.Name) + svc.Spec.LoadBalancerIP = gw.Spec.Addresses[0].Value + updated = true + } + + if svc.Spec.LoadBalancerIP != "" && len(gw.Spec.Addresses) == 0 { + r.Log.Info("service for gateway had a left over address that's no longer specified, removing", gw.Namespace, gw.Name) + svc.Spec.LoadBalancerIP = "" + updated = true + } + + if svc.Spec.Type != corev1.ServiceTypeLoadBalancer { + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + updated = true + } + ports := make([]corev1.ServicePort, 0, len(gw.Spec.Listeners)) for _, listener := range gw.Spec.Listeners { switch proto := listener.Protocol; proto { @@ -128,12 +194,6 @@ func (r *GatewayReconciler) ensureServiceConfiguration(_ context.Context, svc *c } } - updated := false - if svc.Spec.Type != corev1.ServiceTypeLoadBalancer { - svc.Spec.Type = corev1.ServiceTypeLoadBalancer - updated = true - } - newPorts := make(map[string]portAndProtocol, len(ports)) for _, newPort := range ports { newPorts[newPort.Name] = portAndProtocol{ @@ -255,19 +315,11 @@ func mapServiceToGateway(_ context.Context, obj client.Object) (reqs []reconcile return } -func setGatewayAcceptance(gateway *gatewayv1beta1.Gateway) { - var conditions []metav1.Condition - for _, cond := range gateway.Status.Conditions { - if cond.Type != string(gatewayv1beta1.GatewayConditionAccepted) { - conditions = append(conditions, cond) - } - } - - accepted := determineGatewayAcceptance(gateway) - gateway.Status.Conditions = append( - []metav1.Condition{accepted}, - conditions..., - ) +func setGatewayStatus(gateway *gatewayv1beta1.Gateway) { + newAccepted := determineGatewayAcceptance(gateway) + newProgrammed := determineGatewayProgrammed(gateway) + setCond(gateway, newAccepted) + setCond(gateway, newProgrammed) } func determineGatewayAcceptance(gateway *gatewayv1beta1.Gateway) metav1.Condition { @@ -294,6 +346,27 @@ func determineGatewayAcceptance(gateway *gatewayv1beta1.Gateway) metav1.Conditio return accepted } +func determineGatewayProgrammed(gateway *gatewayv1beta1.Gateway) metav1.Condition { + // TODO: give this client access and make it dynamic + return metav1.Condition{ + Type: string(gatewayv1beta1.GatewayConditionProgrammed), + ObservedGeneration: gateway.Generation, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.Now(), + Reason: string(gatewayv1beta1.GatewayReasonPending), + Message: "dataplane not yet configured", + } +} + +// cmpCond returns true if the conditions are the same, minus the timestamp. +func cmpCond(cond1, cond2 metav1.Condition) bool { //nolint:unused + return cond1.Type == cond2.Type && + cond1.Status == cond2.Status && + cond1.ObservedGeneration == cond2.ObservedGeneration && + cond1.Reason == cond2.Reason && + cond1.Message == cond2.Message +} + type portAndProtocol struct { port int32 protocol corev1.Protocol