Skip to content

Commit

Permalink
FIXME
Browse files Browse the repository at this point in the history
  • Loading branch information
shaneutt committed Sep 27, 2023
1 parent 558113e commit 7bcf0a7
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 171 deletions.
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ BLIXT_CONTROLPLANE_IMAGE ?= ghcr.io/kong/blixt-controlplane
BLIXT_DATAPLANE_IMAGE ?= ghcr.io/kong/blixt-dataplane
BLIXT_UDP_SERVER_IMAGE ?= ghcr.io/kong/blixt-udp-test-server

# Other testing variables
EXISTING_CLUSTER ?=

# Image URL to use all building/pushing image targets
TAG ?= integration-tests

Expand Down Expand Up @@ -143,8 +146,18 @@ test.conformance: manifests generate fmt vet
BLIXT_CONTROLPLANE_IMAGE=$(BLIXT_CONTROLPLANE_IMAGE):$(TAG) \
BLIXT_DATAPLANE_IMAGE=$(BLIXT_DATAPLANE_IMAGE):$(TAG) \
BLIXT_UDP_SERVER_IMAGE=$(BLIXT_UDP_SERVER_IMAGE):$(TAG) \
BLIXT_USE_EXISTING_CLUSTER=$(EXISTING_CLUSTER) \
GOFLAGS="-tags=conformance_tests" go test -race -v ./test/conformance/...

.PHONY: debug.conformance
debug.conformance: manifests generate fmt vet
go clean -testcache
BLIXT_CONTROLPLANE_IMAGE=$(BLIXT_CONTROLPLANE_IMAGE):$(TAG) \
BLIXT_DATAPLANE_IMAGE=$(BLIXT_DATAPLANE_IMAGE):$(TAG) \
BLIXT_UDP_SERVER_IMAGE=$(BLIXT_UDP_SERVER_IMAGE):$(TAG) \
BLIXT_USE_EXISTING_CLUSTER=$(EXISTING_CLUSTER) \
GOFLAGS="-tags=conformance_tests" dlv test ./test/conformance/...

##@ Build

.PHONY: build
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ rules:
- endpoints/status
verbs:
- get
- apiGroups:
- ""
resources:
- events
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
80 changes: 45 additions & 35 deletions controllers/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/kong/blixt/pkg/vars"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -30,6 +33,8 @@ import (
//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get

//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch

const gatewayServiceLabel = "konghq.com/owned-by-gateway"

// GatewayReconciler reconciles a Gateway object
Expand Down Expand Up @@ -82,63 +87,49 @@ func (r *GatewayReconciler) gatewayHasMatchingGatewayClass(obj client.Object) bo
func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

gw := new(gatewayv1beta1.Gateway)
if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil {
gateway := new(gatewayv1beta1.Gateway)
if err := r.Client.Get(ctx, req.NamespacedName, gateway); err != nil {
if errors.IsNotFound(err) {
log.Info("object enqueued no longer exists, skipping")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

gwc := new(gatewayv1beta1.GatewayClass)
if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gw.Spec.GatewayClassName)}, gwc); err != nil {
gatewayClass := new(gatewayv1beta1.GatewayClass)
if err := r.Client.Get(ctx, types.NamespacedName{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

if gwc.Spec.ControllerName != vars.GatewayClassControllerName {
if gatewayClass.Spec.ControllerName != vars.GatewayClassControllerName {
return ctrl.Result{}, nil
}

// determine if the Gateway has been accepted, and if it has not then
// determine whether we will accept it, and if not drop it until it has
// been corrected.
oldGateway := gw.DeepCopy()
isAccepted := isGatewayAccepted(gw)
if !isAccepted {
initGatewayStatus(gw)
setGatewayAcceptance(gw)
factorizeStatus(gw, oldGateway)

oldAccepted := getAcceptedConditionForGateway(oldGateway)
accepted := getAcceptedConditionForGateway(gw)

if !sameConditions(oldAccepted, accepted) {
return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway))
}

log.Info("gateway %s/%s is not accepted and will not be provisioned", gw.Namespace, gw.Name)
return ctrl.Result{}, nil
log.Info("found a supported Gateway, determining whether the gateway has been accepted")
oldGateway := gateway.DeepCopy()
if !isGatewayAccepted(gateway) {
log.Info("gateway not yet accepted")
setGatewayListenerStatus(gateway)
setGatewayStatus(gateway)
updateConditionGeneration(gateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}

log.Info("checking for Service for Gateway")
svc, err := r.getServiceForGateway(ctx, gw)
svc, err := r.getServiceForGateway(ctx, gateway)
if err != nil {
return ctrl.Result{}, err
}
if svc == nil {
log.Info("creating Service for Gateway")
return ctrl.Result{}, r.createServiceForGateway(ctx, gw) // service creation will requeue gateway
return ctrl.Result{}, r.createServiceForGateway(ctx, gateway) // service creation will requeue gateway
}

log.Info("checking Service configuration")
needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gw)
// in both cases when the service does not exist or an error has been triggered, the Gateway
// must be not ready. This OR condition is redundant, as (needsUpdate == true AND err == nil)
// should never happen, but useful to highlight the purpose.
needsUpdate, err := r.ensureServiceConfiguration(ctx, svc, gateway)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -149,9 +140,27 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Info("checking Service status", "namespace", svc.Namespace, "name", svc.Name)
switch t := svc.Spec.Type; t {
case corev1.ServiceTypeLoadBalancer:
if err := r.svcIsHealthy(ctx, svc); err != nil {
// TODO: only handles metallb right now https://github.com/Kong/blixt/issues/96
if strings.Contains(err.Error(), "Failed to allocate IP") {
r.Log.Info("failed to allocate IP for Gateway", gateway.Namespace, gateway.Name)
setCond(gateway, metav1.Condition{
Type: string(gatewayv1beta1.GatewayConditionProgrammed),
ObservedGeneration: gateway.Generation,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: string(gatewayv1beta1.GatewayReasonAddressNotUsable),
Message: err.Error(),
})
updateConditionGeneration(gateway)
return ctrl.Result{Requeue: true}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}
return ctrl.Result{}, err
}

if svc.Spec.ClusterIP == "" || len(svc.Status.LoadBalancer.Ingress) < 1 {
log.Info("waiting for Service to be ready")
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{RequeueAfter: time.Second}, nil
}
default:
return ctrl.Result{}, fmt.Errorf("found unsupported Service type: %s (only LoadBalancer type is currently supported)", t)
Expand All @@ -168,8 +177,9 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, nil
}

log.Info("Service is ready, updating Gateway")
updateGatewayStatus(ctx, gw, svc)
factorizeStatus(gw, oldGateway)
return ctrl.Result{}, r.Status().Patch(ctx, gw, client.MergeFrom(oldGateway))
log.Info("Service is ready, setting Gateway as programmed")
setGatewayStatusAddresses(gateway, svc)
setGatewayListenerConditionsAndProgrammed(gateway)
updateConditionGeneration(gateway)
return ctrl.Result{}, r.Status().Patch(ctx, gateway, client.MergeFrom(oldGateway))
}
Loading

0 comments on commit 7bcf0a7

Please sign in to comment.