Skip to content

Commit

Permalink
FIXME
Browse files Browse the repository at this point in the history
  • Loading branch information
shaneutt committed Sep 26, 2023
1 parent 723b6a5 commit 59cba5a
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 145 deletions.
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ BLIXT_CONTROLPLANE_IMAGE ?= ghcr.io/kong/blixt-controlplane
BLIXT_DATAPLANE_IMAGE ?= ghcr.io/kong/blixt-dataplane
BLIXT_UDP_SERVER_IMAGE ?= ghcr.io/kong/blixt-udp-test-server

# Other testing variables
EXISTING_CLUSTER ?=

# Image URL to use all building/pushing image targets
TAG ?= integration-tests

Expand Down Expand Up @@ -143,8 +146,18 @@ test.conformance: manifests generate fmt vet
BLIXT_CONTROLPLANE_IMAGE=$(BLIXT_CONTROLPLANE_IMAGE):$(TAG) \
BLIXT_DATAPLANE_IMAGE=$(BLIXT_DATAPLANE_IMAGE):$(TAG) \
BLIXT_UDP_SERVER_IMAGE=$(BLIXT_UDP_SERVER_IMAGE):$(TAG) \
BLIXT_USE_EXISTING_CLUSTER=$(EXISTING_CLUSTER) \
GOFLAGS="-tags=conformance_tests" go test -race -v ./test/conformance/...

.PHONY: debug.conformance
debug.conformance: manifests generate fmt vet
go clean -testcache
BLIXT_CONTROLPLANE_IMAGE=$(BLIXT_CONTROLPLANE_IMAGE):$(TAG) \
BLIXT_DATAPLANE_IMAGE=$(BLIXT_DATAPLANE_IMAGE):$(TAG) \
BLIXT_UDP_SERVER_IMAGE=$(BLIXT_UDP_SERVER_IMAGE):$(TAG) \
BLIXT_USE_EXISTING_CLUSTER=$(EXISTING_CLUSTER) \
GOFLAGS="-tags=conformance_tests" dlv test ./test/conformance/...

##@ Build

.PHONY: build
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ rules:
- endpoints/status
verbs:
- get
- apiGroups:
- ""
resources:
- events
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
76 changes: 41 additions & 35 deletions controllers/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/kong/blixt/pkg/vars"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -30,6 +33,8 @@ import (
//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get

//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch

const gatewayServiceLabel = "konghq.com/owned-by-gateway"

// GatewayReconciler reconciles a Gateway object
Expand Down Expand Up @@ -82,64 +87,64 @@ func (r *GatewayReconciler) gatewayHasMatchingGatewayClass(obj client.Object) bo
func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

gw := new(gatewayv1beta1.Gateway)
if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil {
gateway := new(gatewayv1beta1.Gateway)
if err := r.Client.Get(ctx, req.NamespacedName, gateway); err != nil {
if errors.IsNotFound(err) {
log.Info("object enqueued no longer exists, skipping")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

gwc := new(gatewayv1beta1.GatewayClass)
if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gw.Spec.GatewayClassName)}, gwc); err != nil {
gatewayClass := new(gatewayv1beta1.GatewayClass)
if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

if gwc.Spec.ControllerName != vars.GatewayClassControllerName {
if gatewayClass.Spec.ControllerName != vars.GatewayClassControllerName {
return ctrl.Result{}, nil
}

// determine if the Gateway has been accepted, and if it has not then
// determine whether we will accept it, and if not drop it until it has
// been corrected.
oldGateway := gw.DeepCopy()
isAccepted := isGatewayAccepted(gw)
if !isAccepted {
initGatewayStatus(gw)
setGatewayAcceptance(gw)
factorizeStatus(gw, oldGateway)

oldAccepted := getAcceptedConditionForGateway(oldGateway)
accepted := getAcceptedConditionForGateway(gw)

if !sameConditions(oldAccepted, accepted) {
return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway))
}

log.Info("gateway %s/%s is not accepted and will not be provisioned", gw.Namespace, gw.Name)
return ctrl.Result{}, nil
log.Info("found a supported Gateway, determining whether the gateway has been accepted")
oldGateway := gateway.DeepCopy()
if !isGatewayAccepted(gateway) {
log.Info("gateway not yet accepted")
setGatewayListenerStatus(gateway)
setGatewayStatus(gateway)
factorizeStatus(gateway, oldGateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}

log.Info("checking for Service for Gateway")
svc, err := r.getServiceForGateway(ctx, gw)
svc, err := r.getServiceForGateway(ctx, gateway)
if err != nil {
return ctrl.Result{}, err
}
if svc == nil {
log.Info("creating Service for Gateway")
return ctrl.Result{}, r.createServiceForGateway(ctx, gw) // service creation will requeue gateway
return ctrl.Result{}, r.createServiceForGateway(ctx, gateway) // service creation will requeue gateway
}

log.Info("checking Service configuration")
needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gw)
// in both cases when the service does not exist or an error has been triggered, the Gateway
// must be not ready. This OR condition is redundant, as (needsUpdate == true AND err == nil)
// should never happen, but useful to highlight the purpose.
needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gateway)
if err != nil {
// TODO: only handles metallb right now https://github.com/Kong/blixt/issues/96
if strings.Contains(err.Error(), "Failed to allocate IP") {
r.Log.Info("failed to allocate IP for Gateway", gateway.Namespace, gateway.Name)
setCond(gateway, metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
ObservedGeneration: gateway.Generation,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: string(gatewayv1beta1.GatewayReasonAddressNotUsable),
Message: err.Error(),
})
factorizeStatus(gateway, oldGateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}
return ctrl.Result{}, err
}
if needsUpdate {
Expand All @@ -151,7 +156,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
case corev1.ServiceTypeLoadBalancer:
if svc.Spec.ClusterIP == "" || len(svc.Status.LoadBalancer.Ingress) < 1 {
log.Info("waiting for Service to be ready")
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{RequeueAfter: time.Second}, nil
}
default:
return ctrl.Result{}, fmt.Errorf("found unsupported Service type: %s (only LoadBalancer type is currently supported)", t)
Expand All @@ -168,8 +173,9 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, nil
}

log.Info("Service is ready, updating Gateway")
updateGatewayStatus(ctx, gw, svc)
factorizeStatus(gw, oldGateway)
return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway))
log.Info("Service is ready, setting Gateway as programmed")
setGatewayAddresses(gateway, svc)
setGatewayListenerConditionsAndProgrammed(gateway)
factorizeStatus(gateway, oldGateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}
111 changes: 33 additions & 78 deletions controllers/gateway_controller_status.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package controllers

import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// updateGatewayStatus computes the new Gateway status, setting its ready condition and all the
// ready listeners's ready conditions to true, unless a resolvedRefs error is discovered. In
// that case, the proper listener ready condition and the gateway one are set to false.
// The addresses are updated as well.
func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc *corev1.Service) {
// gateway addresses
func setGatewayAddresses(gateway *gatewayv1beta1.Gateway, svc *corev1.Service) {
gwaddrs := make([]gatewayv1beta1.GatewayStatusAddress, 0, len(svc.Status.LoadBalancer.Ingress))
for _, addr := range svc.Status.LoadBalancer.Ingress {
if addr.IP != "" {
Expand All @@ -30,17 +23,10 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
}
}
gateway.Status.Addresses = gwaddrs
}

// gateway conditions
newGatewayAcceptedCondition := metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.GatewayReasonAccepted),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
Message: "blixt controlplane accepts responsibility for the Gateway",
}
newGatewayProgrammedCondition := metav1.Condition{
func setGatewayListenerConditionsAndProgrammed(gateway *gatewayv1beta1.Gateway) {
programmed := metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.GatewayReasonProgrammed),
Expand All @@ -49,7 +35,6 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
Message: "the gateway is ready to route traffic",
}

// gateway listeners conditions
listenersStatus := make([]gatewayv1beta1.ListenerStatus, 0, len(gateway.Spec.Listeners))
for _, l := range gateway.Spec.Listeners {
supportedKinds, resolvedRefsCondition := getSupportedKinds(gateway.Generation, l)
Expand All @@ -74,42 +59,16 @@ func updateGatewayStatus(_ context.Context, gateway *gatewayv1beta1.Gateway, svc
},
})
if resolvedRefsCondition.Status == metav1.ConditionFalse {
newGatewayProgrammedCondition.Status = metav1.ConditionFalse
newGatewayProgrammedCondition.Reason = string(gatewayv1beta1.GatewayReasonAddressNotAssigned)
newGatewayProgrammedCondition.Message = "the gateway is not ready to route traffic"
programmed.Status = metav1.ConditionFalse
programmed.Reason = string(gatewayv1beta1.GatewayReasonAddressNotAssigned)
programmed.Message = "the gateway is not ready to route traffic"
}
}

gateway.Status.Conditions = []metav1.Condition{
newGatewayAcceptedCondition,
newGatewayProgrammedCondition,
}
gateway.Status.Listeners = listenersStatus
setCond(gateway, programmed)
}

// initGatewayStatus initializes the GatewayStatus, setting the ready condition to
// not ready and all the listeners ready status to not ready as well.
func initGatewayStatus(gateway *gatewayv1beta1.Gateway) {
gateway.Status = gatewayv1beta1.GatewayStatus{
Conditions: []metav1.Condition{
{
Type: string(gatewayv1beta1.GatewayConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1beta1.GatewayReasonAccepted),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
Message: "blixt controlplane accepts responsibility for the Gateway",
},
{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
Reason: string(gatewayv1beta1.GatewayReasonAddressNotAssigned),
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Now(),
Message: "the gateway is not ready to route traffic",
},
},
}
func setGatewayListenerStatus(gateway *gatewayv1beta1.Gateway) {
gateway.Status.Listeners = make([]gatewayv1beta1.ListenerStatus, 0, len(gateway.Spec.Listeners))
for _, l := range gateway.Spec.Listeners {
supportedKinds, resolvedRefsCondition := getSupportedKinds(gateway.Generation, l)
Expand Down Expand Up @@ -191,16 +150,6 @@ func getSupportedKinds(generation int64, listener gatewayv1beta1.Listener) (supp
// factorizeStatus takes the old gateway conditions not transitioned and copies them
// into the new gateway status, so that only the transitioning conditions gets actually patched.
func factorizeStatus(gateway, oldGateway *gatewayv1beta1.Gateway) {
for i, c := range gateway.Status.Conditions {
for _, oldC := range oldGateway.Status.Conditions {
if c.Type == oldC.Type {
if c.Status == oldC.Status && c.Reason == oldC.Reason {
gateway.Status.Conditions[i] = oldC
}
}
}
}

for i := 0; i < len(gateway.Status.Conditions); i++ {
gateway.Status.Conditions[0].ObservedGeneration = gateway.Generation
}
Expand Down Expand Up @@ -232,28 +181,34 @@ func isGatewayAccepted(gateway *gatewayv1beta1.Gateway) bool {
}

func getAcceptedConditionForGateway(gateway *gatewayv1beta1.Gateway) *metav1.Condition {
for _, c := range gateway.Status.Conditions {
if c.Type == string(gatewayv1beta1.GatewayConditionAccepted) {
return &c
}
}
return nil
return getCond(gateway, string(gatewayv1beta1.GatewayConditionAccepted))
}

// isGatewayProgrammed returns two boolean values:
// - the status of the programmed condition
// - a boolean flag to check if the condition exists
func isGatewayProgrammed(gateway *gatewayv1beta1.Gateway) (status bool, isSet bool) {
for _, c := range gateway.Status.Conditions {
if c.Type == string(gatewayv1beta1.GatewayConditionProgrammed) {
return c.Status == metav1.ConditionTrue, true
func setCond(gateway *gatewayv1beta1.Gateway, setCond metav1.Condition) {
updatedConditions := make([]metav1.Condition, 0, len(gateway.Status.Conditions))

found := false
for _, oldCond := range gateway.Status.Conditions {
if oldCond.Type == setCond.Type {
found = true
updatedConditions = append(updatedConditions, setCond)
} else {
updatedConditions = append(updatedConditions, oldCond)
}
}
return false, false

if !found {
updatedConditions = append(updatedConditions, setCond)
}

gateway.Status.Conditions = updatedConditions
}

// sameConditions returns true if the type, status and reason match for
// the two provided metav1.Conditions.
func sameConditions(cond1, cond2 *metav1.Condition) bool {
return cond1.Type == cond2.Type && cond1.Status == cond2.Status && cond1.Reason == cond2.Reason && cond1.Message == cond2.Message && cond1.ObservedGeneration == cond2.ObservedGeneration
func getCond(gateway *gatewayv1beta1.Gateway, requestedType string) *metav1.Condition {
for _, cond := range gateway.Status.Conditions {
if cond.Type == requestedType {
return &cond
}
}
return nil
}
Loading

0 comments on commit 59cba5a

Please sign in to comment.