Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: controller leader election #173

Merged
merged 3 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions charts/ingress-apisix/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ spec:
volumeMounts:
- mountPath: /ingress-apisix/conf
name: configuration
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
{{- with .Values.ingressController.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
Expand Down
1 change: 1 addition & 0 deletions cmd/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, "kubeconfig", "", "Kubernetes configuration file (by default in-cluster configuration will be used)")
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, "resync-interval", time.Minute, "the controller resync (with Kubernetes) interval, the minimum resync interval is 30s")
cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for compaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api")

Expand Down
17 changes: 10 additions & 7 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ enable_profiling: true # enable profileing via web interfaces

# Kubernetes related configurations.
kubernetes:
kubeconfig: "" # the Kubernetes configuration file path, default is
# "", so the in-cluster configuration will be used.
resync_interval: "6h" # how long should apisix-ingress-controller
# re-synchronizes with Kubernetes, default is 6h,
# and the minimal resync interval is 30s.
app_namespaces: ["*"] # namespace list that controller will watch for resources,
# by default all namespaces (represented by "*") are watched.
kubeconfig: "" # the Kubernetes configuration file path, default is
# "", so the in-cluster configuration will be used.
resync_interval: "6h" # how long should apisix-ingress-controller
# re-synchronizes with Kubernetes, default is 6h,
# and the minimal resync interval is 30s.
app_namespaces: ["*"] # namespace list that controller will watch for resources,
# by default all namespaces (represented by "*") are watched.
election_id: "ingress-apisix-leader" # the election id for the controller leader compaign,
# only the leader will watch and delivery resource changes,
# other instances (as candidates) stand by.

# APISIX related configurations.
apisix:
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
const (
// NamespaceAll represents all namespaces.
NamespaceAll = "*"
// IngressAPISIXLeader is the default election id for the controller
// leader election.
IngressAPISIXLeader = "ingress-apisix-leader"

_minimalResyncInterval = 30 * time.Second
)
Expand All @@ -50,6 +53,7 @@ type KubernetesConfig struct {
Kubeconfig string `json:"kubeconfig" yaml:"kubeconfig"`
ResyncInterval types.TimeDuration `json:"resync_interval" yaml:"resync_interval"`
AppNamespaces []string `json:"app_namespaces" yaml:"app_namespaces"`
ElectionID string `json:"election_id" yaml:"election_id"`
}

// APISIXConfig contains all APISIX related config items.
Expand All @@ -71,6 +75,7 @@ func NewDefaultConfig() *Config {
Kubeconfig: "", // Use in-cluster configurations.
ResyncInterval: types.TimeDuration{Duration: 6 * time.Hour},
AppNamespaces: []string{v1.NamespaceAll},
ElectionID: IngressAPISIXLeader,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestNewConfigFromFile(t *testing.T) {
ResyncInterval: types.TimeDuration{time.Hour},
Kubeconfig: "/path/to/foo/baz",
AppNamespaces: []string{""},
ElectionID: "my-election-id",
},
APISIX: APISIXConfig{
BaseURL: "http://127.0.0.1:8080/apisix",
Expand Down Expand Up @@ -72,6 +73,7 @@ enable_profiling: true
kubernetes:
kubeconfig: /path/to/foo/baz
resync_interval: 1h0m0s
election_id: my-election-id
apisix:
base_url: http://127.0.0.1:8080/apisix
admin_key: "123456"
Expand Down
119 changes: 100 additions & 19 deletions pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@
package controller

import (
"context"
"os"
"sync"

v1 "k8s.io/api/core/v1"

"k8s.io/client-go/tools/cache"

"github.com/api7/ingress-controller/pkg/apisix"
"time"

clientSet "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
crdclientset "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/api7/ingress-controller/pkg/api"
"github.com/api7/ingress-controller/pkg/apisix"
"github.com/api7/ingress-controller/pkg/config"
"github.com/api7/ingress-controller/pkg/kube"
"github.com/api7/ingress-controller/pkg/log"
Expand All @@ -47,6 +51,9 @@ func recoverException() {

// Controller is the ingress apisix controller object.
type Controller struct {
name string
namespace string
cfg *config.Config
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apiServer *api.Server
Expand Down Expand Up @@ -96,6 +103,9 @@ func NewController(cfg *config.Config) (*Controller, error) {
}

c := &Controller{
name: podName,
namespace: podNamespace,
cfg: cfg,
apiServer: apiSrv,
metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
clientset: kube.GetKubeClient(),
Expand All @@ -115,30 +125,102 @@ func (c *Controller) goAttach(handler func()) {
}()
}

// Eventf implements the resourcelock.EventRecorder interface.
func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) {
log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType))
}

// Run launches the controller.
func (c *Controller) Run(stop chan struct{}) error {
// TODO leader election.
rootCtx, rootCancel := context.WithCancel(context.Background())
defer rootCancel()
go func() {
<-stop
rootCancel()
}()
c.metricsCollector.ResetLeader(false)

go func() {
if err := c.apiServer.Run(rootCtx.Done()); err != nil {
log.Errorf("failed to launch API Server: %s", err)
}
}()

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
},
Client: c.clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.name,
EventRecorder: c,
},
}
cfg := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: c.run,
OnNewLeader: func(identity string) {
log.Warnf("found a new leader %s", identity)
if identity != c.name {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
}
},
OnStoppedLeading: func() {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.metricsCollector.ResetLeader(false)
},
},
ReleaseOnCancel: true,
Name: "ingress-apisix",
}

elector, err := leaderelection.NewLeaderElector(cfg)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
return err
}

election:
elector.Run(rootCtx)
select {
case <-rootCtx.Done():
return nil
default:
goto election
}
}

func (c *Controller) run(ctx context.Context) {
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.metricsCollector.ResetLeader(true)
log.Info("controller run as leader")

ac := &Api6Controller{
KubeClientSet: c.clientset,
Api6ClientSet: c.crdClientset,
SharedInformerFactory: c.crdInformerFactory,
CoreSharedInformerFactory: kube.CoreSharedInformerFactory,
Stop: stop,
Stop: ctx.Done(),
}
epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
kube.EndpointsInformer = epInformer
// endpoint
ac.Endpoint(c)
c.goAttach(func() {
ac.CoreSharedInformerFactory.Start(stop)
})
c.goAttach(func() {
if err := c.apiServer.Run(stop); err != nil {
log.Errorf("failed to launch API Server: %s", err)
}
ac.CoreSharedInformerFactory.Start(ctx.Done())
})

// ApisixRoute
Expand All @@ -151,12 +233,11 @@ func (c *Controller) Run(stop chan struct{}) error {
ac.ApisixTLS(c)

c.goAttach(func() {
ac.SharedInformerFactory.Start(stop)
ac.SharedInformerFactory.Start(ctx.Done())
})

<-stop
<-ctx.Done()
c.wg.Wait()
return nil
}

// namespaceWatching accepts a resource key, getting the namespace part
Expand All @@ -182,7 +263,7 @@ type Api6Controller struct {
Api6ClientSet clientSet.Interface
SharedInformerFactory externalversions.SharedInformerFactory
CoreSharedInformerFactory informers.SharedInformerFactory
Stop chan struct{}
Stop <-chan struct{}
}

func (api6 *Api6Controller) ApisixRoute(controller *Controller) {
Expand Down
9 changes: 9 additions & 0 deletions samples/deploy/deployment/ingress-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ spec:
- mountPath: /ingress-apisix/conf/config.yaml
name: apisix-ingress-configmap
subPath: config.yaml
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumes:
- configMap:
name: apisix-ingress-cm
Expand Down
5 changes: 5 additions & 0 deletions test/e2e/ingress/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/api7/ingress-controller/test/e2e/scaffold"
"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -49,6 +50,10 @@ spec:
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")

// TODO When ingress controller can feedback the lifecycle of CRDs to the
// status field, we can poll it rather than sleeping.
time.Sleep(3 * time.Second)

body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err := json.Unmarshal([]byte(body), &placeholder)
Expand Down
14 changes: 9 additions & 5 deletions test/e2e/ingress/resourcepushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ spec:
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams")
scale := 2
err = s.ScaleHTTPBIN(scale)
assert.Nil(ginkgo.GinkgoT(), err)
time.Sleep(5 * time.Second) // wait for ingress to sync
assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number of httpbin instancess")
assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPoddsAvailable(), "waiting for all httpbin pods ready")
// TODO When ingress controller can feedback the lifecycle of CRDs to the
// status field, we can poll it rather than sleeping.
time.Sleep(5 * time.Second)
ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes not expect")
Expand Down Expand Up @@ -84,7 +85,10 @@ spec:

// remove
assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute))
time.Sleep(10 * time.Second) // wait for ingress to sync

// TODO When ingress controller can feedback the lifecycle of CRDs to the
// status field, we can poll it rather than sleeping.
time.Sleep(10 * time.Second)
ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
assert.Len(ginkgo.GinkgoT(), ups, 0, "upstreams nodes not expect")
Expand Down
Loading