Skip to content

Commit

Permalink
chore: refactor the structures of kube clients, shared index informer…
Browse files Browse the repository at this point in the history
… factories (#431)
  • Loading branch information
tokers authored May 14, 2021
1 parent f199cdb commit fb11efc
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 98 deletions.
4 changes: 2 additions & 2 deletions pkg/ingress/apisix_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
c.controller.recorderEvent(ar.V1(), v1.EventTypeNormal, _resourceSynced, nil)
} else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 {
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeNormal, _resourceSynced, nil)
recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue)
}
} else {
log.Errorw("failed list ApisixRoute",
Expand All @@ -241,7 +241,7 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
c.controller.recorderEvent(ar.V1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
} else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 {
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
c.controller.recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
}
} else {
log.Errorw("failed list ApisixRoute",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ApisixTls", tls),
)
c.controller.recorderEvent(tls, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
log.Debug("got SSL object from ApisixTls",
Expand All @@ -136,12 +136,12 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ssl", ssl),
)
c.controller.recorderEvent(tls, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}

c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
return err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ingress/apisix_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}

Expand All @@ -139,7 +139,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
}
log.Errorf("failed to get upstream %s: %s", upsName, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
var newUps *apisixv1.Upstream
Expand All @@ -156,7 +156,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
} else {
Expand All @@ -178,12 +178,12 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.String("cluster", clusterName),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
}
c.controller.recorderEvent(au, corev1.EventTypeNormal, _resourceSynced, nil)
recordStatus(au, _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(au, _resourceSynced, nil, metav1.ConditionTrue)
return err
}

Expand Down
104 changes: 47 additions & 57 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerscorev1 "k8s.io/client-go/listers/core/v1"
Expand All @@ -38,8 +37,6 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/apisix"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
crdclientset "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
listersv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/kube/translation"
Expand All @@ -64,18 +61,16 @@ const (

// Controller is the ingress apisix controller object.
type Controller struct {
name string
namespace string
cfg *config.Config
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apisix apisix.APISIX
translator translation.Translator
apiServer *api.Server
clientset kubernetes.Interface
crdClientset crdclientset.Interface
metricsCollector metrics.Collector
crdInformerFactory externalversions.SharedInformerFactory
name string
namespace string
cfg *config.Config
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apisix apisix.APISIX
translator translation.Translator
apiServer *api.Server
metricsCollector metrics.Collector
kubeClient *kube.KubeClient
// recorder event
recorder record.EventRecorder
// this map enrolls which ApisixTls objects refer to a Kubernetes
Expand Down Expand Up @@ -123,7 +118,8 @@ func NewController(cfg *config.Config) (*Controller, error) {
return nil, err
}

if err := kube.InitInformer(cfg); err != nil {
kubeClient, err := kube.NewKubeClient(cfg)
if err != nil {
return nil, err
}

Expand All @@ -132,9 +128,6 @@ func NewController(cfg *config.Config) (*Controller, error) {
return nil, err
}

crdClientset := kube.GetApisixClient()
sharedInformerFactory := externalversions.NewSharedInformerFactory(crdClientset, cfg.Kubernetes.ResyncInterval.Duration)

var (
watchingNamespace map[string]struct{}
ingressInformer cache.SharedIndexInformer
Expand All @@ -146,63 +139,60 @@ func NewController(cfg *config.Config) (*Controller, error) {
watchingNamespace[ns] = struct{}{}
}
}
kube.EndpointsInformer = kube.CoreSharedInformerFactory.Core().V1().Endpoints()

ingressLister := kube.NewIngressLister(
kube.CoreSharedInformerFactory.Networking().V1().Ingresses().Lister(),
kube.CoreSharedInformerFactory.Networking().V1beta1().Ingresses().Lister(),
kube.CoreSharedInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Lister(),
kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Lister(),
kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
)
apisixRouteLister := kube.NewApisixRouteLister(sharedInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
sharedInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())
apisixRouteLister := kube.NewApisixRouteLister(kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())

if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
ingressInformer = kube.CoreSharedInformerFactory.Networking().V1().Ingresses().Informer()
ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Informer()
} else if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
ingressInformer = kube.CoreSharedInformerFactory.Networking().V1beta1().Ingresses().Informer()
ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Informer()
} else {
ingressInformer = kube.CoreSharedInformerFactory.Extensions().V1beta1().Ingresses().Informer()
ingressInformer = kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Informer()
}
if cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
apisixRouteInformer = sharedInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
} else {
apisixRouteInformer = sharedInformerFactory.Apisix().V1().ApisixRoutes().Informer()
apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Informer()
}

// recorder
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kube.GetKubeClient().CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")})

c := &Controller{
name: podName,
namespace: podNamespace,
cfg: cfg,
apiServer: apiSrv,
apisix: client,
metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
clientset: kube.GetKubeClient(),
crdClientset: crdClientset,
crdInformerFactory: sharedInformerFactory,
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),

epInformer: kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
epLister: kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
svcInformer: kube.CoreSharedInformerFactory.Core().V1().Services().Informer(),
svcLister: kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
name: podName,
namespace: podNamespace,
cfg: cfg,
apiServer: apiSrv,
apisix: client,
metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),

epInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
epLister: kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
svcInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
svcLister: kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
ingressLister: ingressLister,
ingressInformer: ingressInformer,
secretInformer: kube.CoreSharedInformerFactory.Core().V1().Secrets().Informer(),
secretLister: kube.CoreSharedInformerFactory.Core().V1().Secrets().Lister(),
secretInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
secretLister: kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
apisixRouteInformer: apisixRouteInformer,
apisixRouteLister: apisixRouteLister,
apisixUpstreamInformer: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
apisixUpstreamLister: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
apisixTlsInformer: sharedInformerFactory.Apisix().V1().ApisixTlses().Informer(),
apisixTlsLister: sharedInformerFactory.Apisix().V1().ApisixTlses().Lister(),
apisixClusterConfigInformer: sharedInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
apisixClusterConfigLister: sharedInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
apisixUpstreamInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
apisixUpstreamLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
apisixTlsInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
apisixTlsLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
apisixClusterConfigInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
apisixClusterConfigLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
}
c.translator = translation.NewTranslator(&translation.TranslatorOptions{
EndpointsLister: c.epLister,
Expand Down Expand Up @@ -267,7 +257,7 @@ func (c *Controller) Run(stop chan struct{}) error {
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
},
Client: c.clientset.CoordinationV1(),
Client: c.kubeClient.Client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.name,
EventRecorder: c,
Expand Down
11 changes: 5 additions & 6 deletions pkg/ingress/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/apisix-ingress-controller/pkg/kube"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/log"
Expand All @@ -35,7 +34,7 @@ const (
)

// recordStatus record resources status
func recordStatus(at interface{}, reason string, err error, status v1.ConditionStatus) {
func (c *Controller) recordStatus(at interface{}, reason string, err error, status v1.ConditionStatus) {
// build condition
message := _commonSuccessMessage
if err != nil {
Expand All @@ -47,6 +46,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
Status: status,
Message: message,
}
client := c.kubeClient.APISIXClient

switch v := at.(type) {
case *configv1.ApisixTls:
Expand All @@ -56,7 +56,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := kube.GetApisixClient().ApisixV1().ApisixTlses(v.Namespace).
if _, errRecord := client.ApisixV1().ApisixTlses(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixTls",
zap.Error(errRecord),
Expand All @@ -71,7 +71,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := kube.GetApisixClient().ApisixV1().ApisixUpstreams(v.Namespace).
if _, errRecord := client.ApisixV1().ApisixUpstreams(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixUpstream",
zap.Error(errRecord),
Expand All @@ -86,7 +86,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := kube.GetApisixClient().ApisixV2alpha1().ApisixRoutes(v.Namespace).
if _, errRecord := client.ApisixV2alpha1().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
Expand All @@ -98,5 +98,4 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
// This should not be executed
log.Errorf("unsupported resource record: %s", v)
}

}
53 changes: 28 additions & 25 deletions pkg/kube/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,49 @@ package kube

import (
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/apache/apisix-ingress-controller/pkg/config"
clientset "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
)

var (
EndpointsInformer coreinformers.EndpointsInformer
kubeClient kubernetes.Interface
apisixKubeClient *clientset.Clientset
CoreSharedInformerFactory informers.SharedInformerFactory
)

func GetKubeClient() kubernetes.Interface {
return kubeClient
}

func GetApisixClient() clientset.Interface {
return apisixKubeClient
// KubeClient contains some objects used to communicate with Kubernetes API Server.
type KubeClient struct {
// Client is the object used to operate Kubernetes builtin resources.
Client kubernetes.Interface
// APISIXClient is the object used to operate resources under apisix.apache.org group.
APISIXClient clientset.Interface
// SharedIndexInformerFactory is the index informer factory object used to watch and
// list Kubernetes builtin resources.
SharedIndexInformerFactory informers.SharedInformerFactory
// APISIXSharedIndexInformerFactory is the index informer factory object used to watch
// and list Kubernetes resources in apisix.apache.org group.
APISIXSharedIndexInformerFactory externalversions.SharedInformerFactory
}

// initInformer initializes all related shared informers.
// Deprecate: will be refactored in the future without notification.
func InitInformer(cfg *config.Config) error {
var err error
// NewKubeClient creates a high-level Kubernetes client.
func NewKubeClient(cfg *config.Config) (*KubeClient, error) {
restConfig, err := BuildRestConfig(cfg.Kubernetes.Kubeconfig, "")
if err != nil {
return err
return nil, err
}
kubeClient, err = kubernetes.NewForConfig(restConfig)
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
return nil, err
}

apisixKubeClient, err = clientset.NewForConfig(restConfig)
apisixKubeClient, err := clientset.NewForConfig(restConfig)
if err != nil {
return err
return nil, err
}
CoreSharedInformerFactory = informers.NewSharedInformerFactory(kubeClient, cfg.Kubernetes.ResyncInterval.Duration)
factory := informers.NewSharedInformerFactory(kubeClient, cfg.Kubernetes.ResyncInterval.Duration)
apisixFactory := externalversions.NewSharedInformerFactory(apisixKubeClient, cfg.Kubernetes.ResyncInterval.Duration)

return nil
return &KubeClient{
Client: kubeClient,
APISIXClient: apisixKubeClient,
SharedIndexInformerFactory: factory,
APISIXSharedIndexInformerFactory: apisixFactory,
}, nil
}

0 comments on commit fb11efc

Please sign in to comment.