From 4351008607f9f311cb43f1132292cfa42746d57a Mon Sep 17 00:00:00 2001 From: yangs Date: Mon, 17 Apr 2023 20:08:49 +0800 Subject: [PATCH] Feat: support leader election for kube-trigger (#45) Signed-off-by: yangsoon Co-authored-by: yangsoon Signed-off-by: Amit Singh --- pkg/cmd/cmd.go | 142 ++++++++++++++++++++++++++++++++++++++++----- pkg/cmd/options.go | 5 ++ 2 files changed, 131 insertions(+), 16 deletions(-) diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 4231f9d..c0ed6ff 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -21,13 +21,22 @@ import ( "fmt" "os" "os/signal" + "strings" + "sync" "syscall" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/uuid" + v1 "k8s.io/client-go/kubernetes/typed/coordination/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + ctrl "sigs.k8s.io/controller-runtime" "github.com/kubevela/kube-trigger/pkg/config" "github.com/kubevela/kube-trigger/pkg/eventhandler" @@ -54,6 +63,11 @@ const ( FlagTimeout = "timeout" FlagRegistrySize = "registry-size" + + FlagLeaderElect = "leader-elect" + FlagLeaderElectionLeaseDuration = "leader-election-lease-duration" + FlagLeaderElectionRenewDeadline = "leader-election-renew-deadline" + FlagLeaderElectionRetryPeriod = "leader-election-retry-period" ) const ( @@ -68,8 +82,13 @@ Options have a priority like this: cli-flags > env > default-values` ) var ( - logger = logrus.WithField("kubetrigger", "main") - opt = newOption() + logger = logrus.WithField("kubetrigger", "main") + opt = newOption() + enableLeaderElection bool + + leaseDuration time.Duration + renewDeadline time.Duration + retryPeriod time.Duration ) // NewCommand news a command @@ -115,10 +134,35 @@ func addFlags(opt *option, f *pflag.FlagSet) { f.IntVar(&opt.Timeout, FlagTimeout, defaultTimeout, "Timeout for running each action") f.IntVar(&opt.RegistrySize, FlagRegistrySize, defaultRegistrySize, "Cache size for filters and actions") f.StringVar(&k8sresourcewatcher.MultiClusterConfigType, "multi-cluster-config-type", k8sresourcewatcher.TypeClusterGateway, "Multi-cluster config type, supported types: cluster-gateway, cluster-gateway-kubeconfig") + f.BoolVar(&enableLeaderElection, FlagLeaderElect, false, "Enable leader election for kube-trigger. Enabling this will ensure there is only one active kube-trigger.") + f.DurationVar(&leaseDuration, FlagLeaderElectionLeaseDuration, defaultLeaseDuration, "The duration that non-leader candidates will wait to force acquire leadership.") + f.DurationVar(&renewDeadline, FlagLeaderElectionRenewDeadline, defaultRenewDeadline, "The duration that the acting controlplane will retry refreshing leadership before giving up.") + f.DurationVar(&retryPeriod, FlagLeaderElectionRetryPeriod, defaultRetryPeriod, "The duration the LeaderElector clients should wait between tries of actions.") } func runCli(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + r := NewRunner() + if err := r.Start(ctx); err != nil { + return err + } + // Listen to termination signals. + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGTERM, syscall.SIGINT) + select { + case err := <-r.Err(): + logger.Errorf("runner stop with err: %v", err) + return err + case <-ctx.Done(): + logger.Infof("context cancelled, stopping") + case <-sigterm: + logger.Infof("received termination signal, stopping") + } + return nil +} + +func run(ctx context.Context) error { // Set log level. No need to check error, we validated it previously. level, _ := logrus.ParseLevel(opt.LogLevel) logrus.SetLevel(level) @@ -149,9 +193,6 @@ func runCli(cmd *cobra.Command, args []string) error { return errors.Wrap(err, "error when creating executor") } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - instances := make(map[string]types.Source) // Run watchers. @@ -192,18 +233,87 @@ func runCli(cmd *cobra.Command, args []string) error { } // Let the workers run Actions. - go exe.RunJobs(ctx) + exe.RunJobs(ctx) + return nil +} - // Listen to termination signals. - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGTERM) - signal.Notify(sigterm, syscall.SIGINT) - select { - case <-ctx.Done(): - logger.Infof("context cancelled, stopping") - case <-sigterm: - logger.Infof("received termination signal, stopping") +// Runner manages the task execution. +type Runner struct { + errChan chan error + start func(ctx context.Context) error + once sync.Once +} + +// NewRunner new a Runner +func NewRunner() *Runner { + return &Runner{ + errChan: make(chan error), + start: run, } +} +// Start the task. +func (r *Runner) Start(ctx context.Context) error { + if enableLeaderElection { + return r.startLeaderElection(ctx) + } + go func() { + if err := r.start(ctx); err != nil { + r.errChan <- err + } + }() return nil } + +func (r *Runner) startLeaderElection(ctx context.Context) error { + cclient, err := v1.NewForConfig(ctrl.GetConfigOrDie()) + if err != nil { + return err + } + leaderElectionName := fmt.Sprintf("kube-trigger-%s", + strings.ToLower(strings.ReplaceAll(version.Version, ".", "-")), + ) + leaderElectionID := fmt.Sprintf("%s-%s", leaderElectionName, uuid.NewUUID()) + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: leaderElectionName, + Namespace: "vela-system", + }, + Client: cclient, + LockConfig: resourcelock.ResourceLockConfig{ + Identity: leaderElectionID, + }, + } + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + r.once.Do(func() { + if err = r.start(ctx); err != nil { + r.errChan <- err + } + }) + }, + OnStoppedLeading: func() { + r.errChan <- errors.New("leader election lost") + }, + } + l, err := leaderelection.NewLeaderElector( + leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: leaseDuration, + RenewDeadline: renewDeadline, + RetryPeriod: retryPeriod, + Callbacks: callbacks, + }) + if err != nil { + return err + } + go func() { + l.Run(ctx) + }() + return err +} + +// Err return the runner's runtime error +func (r *Runner) Err() chan error { + return r.errChan +} diff --git a/pkg/cmd/options.go b/pkg/cmd/options.go index 33cabdd..e2e985a 100644 --- a/pkg/cmd/options.go +++ b/pkg/cmd/options.go @@ -53,6 +53,11 @@ const ( defaultTimeout = 10 defaultRegistrySize = 100 + + // Values taken from: https://github.com/kubernetes/component-base/blob/master/config/v1alpha1/defaults.go + defaultLeaseDuration = 15 * time.Second + defaultRenewDeadline = 10 * time.Second + defaultRetryPeriod = 2 * time.Second ) func newOption() *option {