Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve declarative usage of authentication module #2766

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions apis/authentication/v1beta1/tenant_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ import (
liqov1beta1 "github.com/liqotech/liqo/apis/core/v1beta1"
)

// AuthzPolicy is the policy used by the cluster to authorize or reject an incoming ResourceSlice.
type AuthzPolicy string

const (
// KeysExchange indicates that a keys exchange must be performed before accepting any ResourceSlice.
KeysExchange AuthzPolicy = "KeysExchange"
// TolerateNoHandshake indicates that the local cluster accepts ResourceSlices even when there
// never have been a key exchange with the peer cluster.
TolerateNoHandshake AuthzPolicy = "TolerateNoHandshake"
// DefaultAuthzPolicy is the default authorization policy if nothing is provided.
DefaultAuthzPolicy AuthzPolicy = KeysExchange
)

// TenantResource is the name of the tenant resources.
var TenantResource = "tenants"

Expand All @@ -33,10 +46,23 @@ var TenantGroupResource = schema.GroupResource{Group: GroupVersion.Group, Resour
// TenantGroupVersionResource is groupResourceVersion used to register these objects.
var TenantGroupVersionResource = GroupVersion.WithResource(TenantResource)

// GetAuthzPolicyValue returns the value of the pointer to an AuthzPolicy type, if the pointer is nil it returns the default value.
func GetAuthzPolicyValue(policy *AuthzPolicy) AuthzPolicy {
if policy == nil {
return DefaultAuthzPolicy
}
return *policy
}

// TenantSpec defines the desired state of Tenant.
type TenantSpec struct {
// ClusterID is the id of the consumer cluster.
ClusterID liqov1beta1.ClusterID `json:"clusterID,omitempty"`
// AuthzPolicy is the policy used by the cluster to authorize or reject an incoming ResourceSlice.
// Default is KeysExchange.
// +kubebuilder:validation:Enum=KeysExchange;TolerateNoHandshake
// +kubebuilder:default=KeysExchange
*AuthzPolicy `json:"authzPolicy,omitempty"`
// PublicKey is the public key of the tenant cluster.
PublicKey []byte `json:"publicKey,omitempty"`
// CSR is the Certificate Signing Request of the tenant cluster.
Expand Down
5 changes: 5 additions & 0 deletions apis/authentication/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ spec:
spec:
description: TenantSpec defines the desired state of Tenant.
properties:
authzPolicy:
default: KeysExchange
description: |-
AuthzPolicy is the policy used by the cluster to authorize or reject an incoming ResourceSlice.
Default is KeysExchange.
enum:
- KeysExchange
- TolerateNoHandshake
type: string
clusterID:
description: ClusterID is the id of the consumer cluster.
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$
Expand Down
70 changes: 58 additions & 12 deletions internal/crdReplicator/crdReplicator-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package crdreplicator

import (
"context"
"crypto/sha256"
"fmt"

corev1 "k8s.io/api/core/v1"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/liqotech/liqo/internal/crdReplicator/resources"
"github.com/liqotech/liqo/pkg/consts"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
"github.com/liqotech/liqo/pkg/utils/getters"
traceutils "github.com/liqotech/liqo/pkg/utils/trace"
)

Expand Down Expand Up @@ -107,13 +109,9 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (result ct
// the object is being deleted
if controllerutil.ContainsFinalizer(&secret, finalizer) {
// close remote watcher for remote cluster
reflector, ok := c.Reflectors[remoteClusterID]
if ok {
if err := reflector.Stop(); err != nil {
klog.Errorf("%sFailed to stop reflection: %v", prefix, err)
return ctrl.Result{}, err
}
delete(c.Reflectors, remoteClusterID)
if err := c.stopReflector(remoteClusterID, false); err != nil {
klog.Errorf("%sFailed to stop reflection: %v", prefix, err)
return ctrl.Result{}, err
}

// remove the finalizer from the list and update it.
Expand All @@ -139,17 +137,41 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (result ct
}

// Check if reflection towards the remote cluster has already been started.
if _, found := c.Reflectors[remoteClusterID]; found {
if reflector, found := c.Reflectors[remoteClusterID]; found {
// We ignore the case in which the secret lacks of the kubeconfig, as in that case we still want to delete the reflector
// and manage the error.
secretContent := secret.Data[consts.KubeconfigSecretField]
secretHash := c.hashSecretConfig(secretContent)

// If there are no changes on the secret or on the remote namespace where the reflector operats, skip reconciliation.
if reflector.GetSecretHash() == secretHash && reflector.GetRemoteTenantNamespace() == remoteTenantNamespace {
return ctrl.Result{}, nil
}

// If there have been a change on the secret, delete the secret to allow the creation of a new reflector.
klog.Infof("%sChanges detected on the control plane secret %q for clusterID %q: recreating reflector",
prefix, req.NamespacedName, remoteClusterID)
// Stop the reflection to update the reflector
if err := c.stopReflector(remoteClusterID, true); err != nil {
klog.Errorf("%sFailed to stop reflection: %v", prefix, err)
return ctrl.Result{}, err
}
}

// We need to get the secret to make sure that there are not multiple secrets pointing to the same cluster
currSecret, err := getters.GetControlPlaneKubeconfigSecretByClusterID(ctx, c.Client, remoteClusterID)
if err != nil {
klog.Errorf("%sUnable to process secret for clusterID %q: %v", prefix, remoteClusterID, err)
return ctrl.Result{}, nil
}

config, err := c.IdentityReader.GetConfig(remoteClusterID, localTenantNamespace)
config, err := c.IdentityReader.GetConfigFromSecret(remoteClusterID, currSecret)
if err != nil {
klog.Errorf("%sUnable to retrieve config for clusterID %q: %v", prefix, remoteClusterID, err)
return ctrl.Result{}, nil
}

return ctrl.Result{}, c.setupReflectionToPeeringCluster(ctx, config, remoteClusterID, localTenantNamespace, remoteTenantNamespace)
return ctrl.Result{}, c.setupReflectionToPeeringCluster(ctx, currSecret, config, remoteClusterID, localTenantNamespace, remoteTenantNamespace)
}

// SetupWithManager registers a new controller for identity Secrets.
Expand Down Expand Up @@ -220,20 +242,44 @@ func (c *Controller) ensureFinalizer(ctx context.Context, secret *corev1.Secret,
return c.Client.Update(ctx, secret)
}

func (c *Controller) setupReflectionToPeeringCluster(ctx context.Context, config *rest.Config,
func (c *Controller) setupReflectionToPeeringCluster(ctx context.Context, secret *corev1.Secret, config *rest.Config,
remoteClusterID liqov1beta1.ClusterID, localNamespace, remoteNamespace string) error {
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
klog.Errorf("%sUnable to create dynamic client for remote cluster: %v", remoteClusterID, err)
return err
}

reflector := c.ReflectionManager.NewForRemote(dynamicClient, remoteClusterID, localNamespace, remoteNamespace)
secretHash := c.hashSecretConfig(secret.Data[consts.KubeconfigSecretField])

reflector := c.ReflectionManager.NewForRemote(dynamicClient, remoteClusterID, localNamespace, remoteNamespace, secretHash)
reflector.Start(ctx)
c.Reflectors[remoteClusterID] = reflector
return nil
}

func (c *Controller) stopReflector(remoteClusterID liqov1beta1.ClusterID, skipChecks bool) error {
reflector, ok := c.Reflectors[remoteClusterID]
if ok {
stopFn := reflector.Stop
// Use the StopForce function if we want to skip the checks
if skipChecks {
stopFn = reflector.StopForce
}

if err := stopFn(); err != nil {
return err
}
delete(c.Reflectors, remoteClusterID)
}
return nil
}

func (c *Controller) hashSecretConfig(secretData []byte) string {
hash := sha256.Sum256(secretData)
return fmt.Sprintf("%x", hash)
}

func (c *Controller) enforceReflectionStatus(ctx context.Context, remoteClusterID liqov1beta1.ClusterID, deleting bool) error {
reflector, found := c.Reflectors[remoteClusterID]
if !found {
Expand Down
5 changes: 4 additions & 1 deletion internal/crdReplicator/reflection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (m *Manager) Start(ctx context.Context, registeredResources []resources.Res
}

// NewForRemote returns a new reflector for a given remote cluster.
func (m *Manager) NewForRemote(client dynamic.Interface, clusterID liqov1beta1.ClusterID, localNamespace, remoteNamespace string) *Reflector {
func (m *Manager) NewForRemote(client dynamic.Interface, clusterID liqov1beta1.ClusterID, localNamespace, remoteNamespace string,
secretHash string) *Reflector {
return &Reflector{
manager: m,

Expand All @@ -97,6 +98,8 @@ func (m *Manager) NewForRemote(client dynamic.Interface, clusterID liqov1beta1.C
remoteNamespace: remoteNamespace,
remoteClusterID: clusterID,

secretHash: secretHash,

resources: make(map[schema.GroupVersionResource]*reflectedResource),
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
Expand Down
2 changes: 1 addition & 1 deletion internal/crdReplicator/reflection/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = Describe("Manager tests", func() {
Describe("the NewForRemote function", func() {
var reflector *Reflector

JustBeforeEach(func() { reflector = manager.NewForRemote(remote, remoteClusterID, localNamespace, remoteNamespace) })
JustBeforeEach(func() { reflector = manager.NewForRemote(remote, remoteClusterID, localNamespace, remoteNamespace, "") })
It("Should return a non nil reflector", func() { Expect(reflector).ToNot(BeNil()) })
It("Should correctly reference the parent manager", func() { Expect(reflector.manager).To(Equal(manager)) })
It("Should correctly populate the reflector fields", func() {
Expand Down
94 changes: 59 additions & 35 deletions internal/crdReplicator/reflection/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Reflector struct {

resources map[schema.GroupVersionResource]*reflectedResource

secretHash string

workqueue workqueue.RateLimitingInterface
cancel context.CancelFunc
}
Expand All @@ -72,6 +74,16 @@ type reflectedResource struct {
initialized bool
}

// GetRemoteTenantNamespace returns the remote namespace where the reflector reflects the resources.
func (r *Reflector) GetRemoteTenantNamespace() string {
return r.remoteNamespace
}

// GetSecretHash returns the hash of the secret that generated this reflector.
func (r *Reflector) GetSecretHash() string {
return r.secretHash
}

// Start starts the reflection towards the remote cluster.
func (r *Reflector) Start(ctx context.Context) {
ctx, r.cancel = context.WithCancel(ctx)
Expand All @@ -87,21 +99,15 @@ func (r *Reflector) Start(ctx context.Context) {
}()
}

// Stop stops the reflection towards the remote cluster, and removes the replicated resources.
// Stop stops the reflection towards the remote cluster, it returns an error if there are replicated resources.
func (r *Reflector) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()

klog.Infof("[%v] Stopping reflection towards remote cluster", r.remoteClusterID)

for gvr := range r.resources {
if err := r.stopForResource(gvr); err != nil {
return err
}
}
return r.stop(false)
}

r.cancel()
return nil
// StopForce stops the reflection towards the remote cluster, ignoring any replicated resource. This means that if replication is not
// restored, then there might be some orphan replicated resource in the remote cluster.
func (r *Reflector) StopForce() error {
return r.stop(true)
}

// ResourceStarted returns whether the reflection for the given resource has been started.
Expand Down Expand Up @@ -163,16 +169,32 @@ func (r *Reflector) StartForResource(ctx context.Context, resource *resources.Re
}()
}

// StopForResource stops the reflection of the given resource, and removes the replicated objects.
// StopForResource stops the reflection of the given resource. It fails if there are replicated objects.
func (r *Reflector) StopForResource(resource *resources.Resource) error {
r.mu.Lock()
defer r.mu.Unlock()

return r.stopForResource(resource.GroupVersionResource)
return r.stopForResource(resource.GroupVersionResource, false)
}

// stopForResource stops the reflection of the given resource, and removes the replicated objects.
func (r *Reflector) stopForResource(gvr schema.GroupVersionResource) error {
func (r *Reflector) stop(skipResourcePresenceCheck bool) error {
r.mu.Lock()
defer r.mu.Unlock()

klog.Infof("[%v] Stopping reflection towards remote cluster", r.remoteClusterID)

for gvr := range r.resources {
if err := r.stopForResource(gvr, skipResourcePresenceCheck); err != nil {
return err
}
}

r.cancel()
return nil
}

// stopForResource stops the reflection of the given resource. Unless skipResourcePresenceCheck is false, it fails if there are replicated objects.
func (r *Reflector) stopForResource(gvr schema.GroupVersionResource, skipResourcePresenceCheck bool) error {
rs, found := r.resources[gvr]
if !found {
// This resource was already stopped, just return
Expand All @@ -181,24 +203,26 @@ func (r *Reflector) stopForResource(gvr schema.GroupVersionResource) error {

klog.Infof("[%v] Stopping reflection of %v", r.remoteClusterID, gvr)

// Check if any object is still present in the local or in the remote cluster
for key, lister := range map[string]cache.GenericNamespaceLister{"local": rs.local, "remote": rs.remote} {
objects, err := lister.List(labels.Everything())

if key == "remote" && apierrors.IsForbidden(err) {
// The remote cluster has probably removed the necessary permissions to operate on reflected resources, let's ignore the error
klog.Infof("[%v] Cannot list %v objects in the remote cluster (permission removed by provider).", r.remoteClusterID, gvr)
continue
}

if err != nil {
klog.Errorf("[%v] Failed to stop reflection of %v: %v", r.remoteClusterID, gvr, err)
return err
}

if len(objects) > 0 {
klog.Errorf("[%v] Cannot stop reflection of %v, since %v objects are still present", r.remoteClusterID, gvr, key)
return fmt.Errorf("%v %v still present for cluster %v", key, gvr, r.remoteClusterID)
if !skipResourcePresenceCheck {
// Check if any object is still present in the local or in the remote cluster
for key, lister := range map[string]cache.GenericNamespaceLister{"local": rs.local, "remote": rs.remote} {
objects, err := lister.List(labels.Everything())

if key == "remote" && apierrors.IsForbidden(err) {
// The remote cluster has probably removed the necessary permissions to operate on reflected resources, let's ignore the error
klog.Infof("[%v] Cannot list %v objects in the remote cluster (permission removed by provider).", r.remoteClusterID, gvr)
continue
}

if err != nil {
klog.Errorf("[%v] Failed to stop reflection of %v: %v", r.remoteClusterID, gvr, err)
return err
}

if len(objects) > 0 {
klog.Errorf("[%v] Cannot stop reflection of %v, since %v objects are still present", r.remoteClusterID, gvr, key)
return fmt.Errorf("%v %v still present for cluster %v", key, gvr, r.remoteClusterID)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/crdReplicator/reflection/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("Reflector tests", func() {

manager = NewManager(local, localClusterID, workers, 0)
manager.Start(ctx, []resources.Resource{res})
reflector = manager.NewForRemote(remote, remoteClusterID, localNamespace, remoteNamespace)
reflector = manager.NewForRemote(remote, remoteClusterID, localNamespace, remoteNamespace, "")
})

AfterEach(func() { cancel() })
Expand Down Expand Up @@ -157,7 +157,7 @@ var _ = Describe("Reflector tests", func() {
Eventually(func() bool { return reflector.resources[gvr].initialized }).Should(BeTrue())
})

JustBeforeEach(func() { err = reflector.stopForResource(gvr) })
JustBeforeEach(func() { err = reflector.stopForResource(gvr, false) })

When("no object is present", func() {
It("should succeed", func() { Expect(err).ToNot(HaveOccurred()) })
Expand Down
11 changes: 1 addition & 10 deletions pkg/identityManager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,7 @@ func (certManager *identityManager) GetConfig(remoteCluster liqov1beta1.ClusterI
return nil, err
}

cnf, err := kubeconfig.BuildConfigFromSecret(secret)
if err != nil {
return nil, err
}

if certManager.isAwsIdentity(secret) {
return certManager.mutateIAMConfig(secret, remoteCluster, cnf)
}

return cnf, nil
return certManager.GetConfigFromSecret(remoteCluster, secret)
}

func (certManager *identityManager) GetSecretNamespacedName(remoteCluster liqov1beta1.ClusterID,
Expand Down
Loading