Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NET-7656] Add GatewayClassConfig watch for MeshGateway controller #3537

Merged
merged 5 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 121 additions & 70 deletions control-plane/controllers/resources/mesh_gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

meshv2beta1 "github.com/hashicorp/consul-k8s/control-plane/api/mesh/v2beta1"
"github.com/hashicorp/consul-k8s/control-plane/gateways"
Expand Down Expand Up @@ -84,6 +87,49 @@ func (r *MeshGatewayController) SetupWithManager(mgr ctrl.Manager) error {
Owns(&rbacv1.RoleBinding{}).
Owns(&corev1.Service{}).
Owns(&corev1.ServiceAccount{}).
Watches(
source.NewKindWithCache(&meshv2beta1.GatewayClass{}, mgr.GetCache()),
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
gateways, err := r.getGatewaysReferencingGatewayClass(context.Background(), o.(*meshv2beta1.GatewayClass))
if err != nil {
return nil
}

requests := make([]reconcile.Request, 0, len(gateways.Items))
for _, gateway := range gateways.Items {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: gateway.Namespace,
Name: gateway.Name,
}})
}

return requests
})).
Watches(
source.NewKindWithCache(&meshv2beta1.GatewayClassConfig{}, mgr.GetCache()),
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
classes, err := r.getGatewayClassesReferencingGatewayClassConfig(context.Background(), o.(*meshv2beta1.GatewayClassConfig))
if err != nil {
return nil
}

var requests []reconcile.Request
for _, class := range classes.Items {
gateways, err := r.getGatewaysReferencingGatewayClass(context.Background(), class)
if err != nil {
continue
}

for _, gateway := range gateways.Items {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: gateway.Namespace,
Name: gateway.Name,
}})
}
}

return requests
})).
Complete(r)
}

Expand All @@ -96,7 +142,7 @@ func (r *MeshGatewayController) SetupWithManager(mgr ctrl.Manager) error {
// 4. Role
// 5. RoleBinding
func (r *MeshGatewayController) onCreateUpdate(ctx context.Context, req ctrl.Request, resource *meshv2beta1.MeshGateway) error {
// fetch gatewayclassconfig
// Fetch GatewayClassConfig for the gateway
gcc, err := r.getGatewayClassConfigForGateway(ctx, resource)
if err != nil {
r.Log.Error(err, "unable to get gatewayclassconfig for gateway: %s gatewayclass: %s", resource.Name, resource.Spec.GatewayClassName)
Expand All @@ -105,77 +151,77 @@ func (r *MeshGatewayController) onCreateUpdate(ctx context.Context, req ctrl.Req

builder := gateways.NewMeshGatewayBuilder(resource, r.GatewayConfig, gcc)

// Create ServiceAccount
desiredAccount := builder.ServiceAccount()
existingAccount := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: desiredAccount.Namespace, Name: desiredAccount.Name}}

upsertOp := func(ctx context.Context, _, object client.Object) error {
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, object, func() error { return nil })
return err
}

err = r.opIfNewOrOwned(ctx, resource, &corev1.ServiceAccount{}, builder.ServiceAccount(), upsertOp)
err = r.opIfNewOrOwned(ctx, resource, existingAccount, desiredAccount, upsertOp)
if err != nil {
return fmt.Errorf("unable to create service account: %w", err)
}

// Create Role
desiredRole := builder.Role()
existingRole := &rbacv1.Role{ObjectMeta: metav1.ObjectMeta{Namespace: desiredRole.Namespace, Name: desiredRole.Name}}

err = r.opIfNewOrOwned(ctx, resource, &rbacv1.Role{}, builder.Role(), upsertOp)
err = r.opIfNewOrOwned(ctx, resource, existingRole, desiredRole, upsertOp)
if err != nil {
return fmt.Errorf("unable to create role: %w", err)
}

// Create RoleBinding
desiredBinding := builder.RoleBinding()
existingBinding := &rbacv1.RoleBinding{ObjectMeta: metav1.ObjectMeta{Namespace: desiredBinding.Namespace, Name: desiredBinding.Name}}

err = r.opIfNewOrOwned(ctx, resource, &rbacv1.RoleBinding{}, builder.RoleBinding(), upsertOp)
err = r.opIfNewOrOwned(ctx, resource, existingBinding, desiredBinding, upsertOp)
if err != nil {
return fmt.Errorf("unable to create role binding: %w", err)
}

// Create Service
desiredService := builder.Service()
existingService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: desiredService.Namespace, Name: desiredService.Name}}

mergeServiceOp := func(ctx context.Context, existingObject, object client.Object) error {
existingService, ok := existingObject.(*corev1.Service)
if !ok && existingService != nil {
return fmt.Errorf("unable to infer existing service type")
}
builtService, ok := object.(*corev1.Service)
if !ok {
return fmt.Errorf("unable to infer built service type")
}

mergedService := mergeService(existingService, builtService)
mergeServiceOp := func(ctx context.Context, existingObj, desiredObj client.Object) error {
existing := existingObj.(*corev1.Service)
desired := desiredObj.(*corev1.Service)

_, err := controllerutil.CreateOrUpdate(ctx, r.Client, mergedService, func() error { return nil })
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, existing, func() error {
gateways.MergeService(existing, desired)
return nil
})
return err
}

err = r.opIfNewOrOwned(ctx, resource, &corev1.Service{}, builder.Service(), mergeServiceOp)
err = r.opIfNewOrOwned(ctx, resource, existingService, desiredService, mergeServiceOp)
if err != nil {
return fmt.Errorf("unable to create service: %w", err)
}

// Create deployment

mergeDeploymentOp := func(ctx context.Context, existingObject, object client.Object) error {
existingDeployment, ok := existingObject.(*appsv1.Deployment)
if !ok && existingDeployment != nil {
return fmt.Errorf("unable to infer existing deployment type")
}
builtDeployment, ok := object.(*appsv1.Deployment)
if !ok {
return fmt.Errorf("unable to infer built deployment type")
}
// Create Deployment
desiredDeployment, err := builder.Deployment()
if err != nil {
return fmt.Errorf("unable to create deployment: %w", err)
}
existingDeployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: desiredDeployment.Namespace, Name: desiredDeployment.Name}}

mergedDeployment := builder.MergeDeployments(gcc, existingDeployment, builtDeployment)
mergeDeploymentOp := func(ctx context.Context, existingObj, desiredObj client.Object) error {
existing := existingObj.(*appsv1.Deployment)
desired := desiredObj.(*appsv1.Deployment)

_, err := controllerutil.CreateOrUpdate(ctx, r.Client, mergedDeployment, func() error { return nil })
_, err = controllerutil.CreateOrUpdate(ctx, r.Client, existing, func() error {
gateways.MergeDeployment(existing, desired)
return nil
})
return err
}

builtDeployment, err := builder.Deployment()
if err != nil {
return fmt.Errorf("unable to build deployment: %w", err)
}

err = r.opIfNewOrOwned(ctx, resource, &appsv1.Deployment{}, builtDeployment, mergeDeploymentOp)
err = r.opIfNewOrOwned(ctx, resource, existingDeployment, desiredDeployment, mergeDeploymentOp)
if err != nil {
return fmt.Errorf("unable to create deployment: %w", err)
}
Expand All @@ -199,24 +245,24 @@ func (r *MeshGatewayController) onDelete(ctx context.Context, req ctrl.Request,
// The existing and new object are available in case any merging needs
// to occur, such as unknown annotations and values from the existing object
// that need to be carried forward onto the new object.
type ownedObjectOp func(ctx context.Context, existingObject client.Object, newObject client.Object) error
type ownedObjectOp func(ctx context.Context, existing, desired client.Object) error

// opIfNewOrOwned runs a given ownedObjectOp to create, update, or delete a resource.
// The purpose of opIfNewOrOwned is to ensure that we aren't updating or deleting a
// resource that was not created by us. If this scenario is encountered, we error.
func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *meshv2beta1.MeshGateway, scanTarget, writeSource client.Object, op ownedObjectOp) error {
func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *meshv2beta1.MeshGateway, existing, desired client.Object, op ownedObjectOp) error {
// Ensure owner reference is always set on objects that we write
if err := ctrl.SetControllerReference(gateway, writeSource, r.Client.Scheme()); err != nil {
if err := ctrl.SetControllerReference(gateway, desired, r.Client.Scheme()); err != nil {
return err
}

key := client.ObjectKey{
Namespace: writeSource.GetNamespace(),
Name: writeSource.GetName(),
Namespace: existing.GetNamespace(),
Name: existing.GetName(),
}

exists := false
if err := r.Get(ctx, key, scanTarget); err != nil {
if err := r.Get(ctx, key, existing); err != nil {
// We failed to fetch the object in a way that doesn't tell us about its existence
if !k8serr.IsNotFound(err) {
return err
Expand All @@ -228,12 +274,12 @@ func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *mes

// None exists, so we need only execute the operation
if !exists {
return op(ctx, nil, writeSource)
return op(ctx, existing, desired)
}

// Ensure the existing object was put there by us so that we don't overwrite random objects
owned := false
for _, reference := range scanTarget.GetOwnerReferences() {
for _, reference := range existing.GetOwnerReferences() {
if reference.UID == gateway.GetUID() && reference.Name == gateway.GetName() {
owned = true
break
Expand All @@ -242,7 +288,7 @@ func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *mes
if !owned {
return errResourceNotOwned
}
return op(ctx, scanTarget, writeSource)
return op(ctx, existing, desired)
}

func (r *MeshGatewayController) getGatewayClassConfigForGateway(ctx context.Context, gateway *meshv2beta1.MeshGateway) (*meshv2beta1.GatewayClassConfig, error) {
Expand Down Expand Up @@ -288,39 +334,44 @@ func (r *MeshGatewayController) getGatewayClassForGateway(ctx context.Context, g
return &gatewayClass, nil
}

func areServicesEqual(a, b *corev1.Service) bool {
// If either service "a" or "b" is nil, don't want to try and merge the nil service
if a == nil || b == nil {
return true
}

if !equality.Semantic.DeepEqual(a.Annotations, b.Annotations) {
return false
// getGatewayClassesReferencingGatewayClassConfig queries all GatewayClass resources in the
// cluster and returns any that reference the given GatewayClassConfig.
func (r *MeshGatewayController) getGatewayClassesReferencingGatewayClassConfig(ctx context.Context, config *meshv2beta1.GatewayClassConfig) (*meshv2beta1.GatewayClassList, error) {
if config == nil {
return nil, nil
}

if len(b.Spec.Ports) != len(a.Spec.Ports) {
return false
allClasses := &meshv2beta1.GatewayClassList{}
if err := r.Client.List(ctx, allClasses); err != nil {
return nil, client.IgnoreNotFound(err)
}

for i, port := range a.Spec.Ports {
otherPort := b.Spec.Ports[i]
if port.Port != otherPort.Port || port.Protocol != otherPort.Protocol {
return false
matchingClasses := &meshv2beta1.GatewayClassList{}
for _, class := range allClasses.Items {
if class.Spec.ParametersRef != nil && class.Spec.ParametersRef.Name == config.Name {
matchingClasses.Items = append(matchingClasses.Items, class)
}
}
return true
return matchingClasses, nil
}

// mergeService is used to keep annotations and ports from the `from` Service
// to the `to` service. This prevents an infinite reconciliation loop when
// Kubernetes adds this configuration back in.
func mergeService(from, to *corev1.Service) *corev1.Service {
if areServicesEqual(from, to) {
return to
// getGatewaysReferencingGatewayClass queries all MeshGateway resources in the cluster
// and returns any that reference the given GatewayClass.
func (r *MeshGatewayController) getGatewaysReferencingGatewayClass(ctx context.Context, class *meshv2beta1.GatewayClass) (*meshv2beta1.MeshGatewayList, error) {
if class == nil {
return nil, nil
}

to.Annotations = from.Annotations
to.Spec.Ports = from.Spec.Ports
allGateways := &meshv2beta1.MeshGatewayList{}
if err := r.Client.List(ctx, allGateways); err != nil {
return nil, client.IgnoreNotFound(err)
}

return to
matchingGateways := &meshv2beta1.MeshGatewayList{}
for _, gateway := range allGateways.Items {
if gateway.Spec.GatewayClassName == class.Name {
matchingGateways.Items = append(matchingGateways.Items, gateway)
}
}
return matchingGateways, nil
}
48 changes: 35 additions & 13 deletions control-plane/gateways/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,10 @@ func (b *meshGatewayBuilder) deploymentSpec() (*appsv1.DeploymentSpec, error) {
}, nil
}

func (b *meshGatewayBuilder) MergeDeployments(gcc *meshv2beta1.GatewayClassConfig, old, new *appsv1.Deployment) *appsv1.Deployment {
if old == nil {
return new
}
if !compareDeployments(old, new) {
old.Spec.Template = new.Spec.Template
new.Spec.Replicas = deploymentReplicaCount(nil, old.Spec.Replicas)
}

return new
}

func compareDeployments(a, b *appsv1.Deployment) bool {
// areDeploymentsEqual determines whether two Deployments are the same in
// the ways that we care about. This specifically ignores valid out-of-band
// changes such as initContainer injection.
func areDeploymentsEqual(a, b *appsv1.Deployment) bool {
// since K8s adds a bunch of defaults when we create a deployment, check that
// they don't differ by the things that we may actually change, namely container
// ports
Expand Down Expand Up @@ -183,3 +174,34 @@ func deploymentReplicaCount(replicas *meshv2beta1.GatewayClassReplicasConfig, cu
// otherwise use the global default
return pointer.Int32(globalDefaultInstances)
}

// MergeDeployment is used to update an appsv1.Deployment without overwriting any
// existing annotations or labels that were placed there by other vendors.
//
// based on https://github.com/kubernetes-sigs/controller-runtime/blob/4000e996a202917ad7d40f02ed8a2079a9ce25e9/pkg/controller/controllerutil/example_test.go
func MergeDeployment(existing, desired *appsv1.Deployment) {
// Only overwrite fields if the Deployment doesn't exist yet
if existing.ObjectMeta.CreationTimestamp.IsZero() {
existing.ObjectMeta.OwnerReferences = desired.ObjectMeta.OwnerReferences
existing.Spec = desired.Spec
existing.Annotations = desired.Annotations
existing.Labels = desired.Labels
return
}

// Make sure we don't reconcile forever by overwriting valid out-of-band
// changes such as init container injection. If the deployments are
// sufficiently equal, we only update the annotations.
if !areDeploymentsEqual(existing, desired) {
desired.Spec.Replicas = deploymentReplicaCount(nil, existing.Spec.Replicas)
existing.Spec = desired.Spec
}

// If the Deployment already exists, add any desired annotations + labels to existing set
for k, v := range desired.ObjectMeta.Annotations {
existing.ObjectMeta.Annotations[k] = v
}
for k, v := range desired.ObjectMeta.Labels {
existing.ObjectMeta.Labels[k] = v
}
}
Loading
Loading