Skip to content

Commit

Permalink
Implement Application controller boilerplate (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmt authored Feb 20, 2018
1 parent 0f712e4 commit f9bc9bd
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 24 deletions.
24 changes: 13 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions application/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package controller

import (
"context"

appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
log "github.com/sirupsen/logrus"

"time"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const (
appResyncPeriod = 10 * time.Minute
)

// ApplicationController is the controller for application resources.
type ApplicationController struct {
kubeclientset kubernetes.Interface
applicationclientset appclientset.Interface
appQueue workqueue.RateLimitingInterface

appInformer cache.SharedIndexInformer
}

// NewApplicationController creates new instance of ApplicationController.
func NewApplicationController(kubeclientset kubernetes.Interface, applicationclientset appclientset.Interface) *ApplicationController {
appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return &ApplicationController{
kubeclientset: kubeclientset,
applicationclientset: applicationclientset,
appQueue: appQueue,
appInformer: newApplicationInformer(applicationclientset, appQueue),
}
}

// Run starts the Application CRD controller.
func (ctrl *ApplicationController) Run(ctx context.Context, appWorkers int) {
defer runtime.HandleCrash()
defer ctrl.appQueue.ShutDown()

go ctrl.appInformer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
}

for i := 0; i < appWorkers; i++ {
go wait.Until(ctrl.runWorker, time.Second, ctx.Done())
}

<-ctx.Done()
}

func (ctrl *ApplicationController) processNextItem() bool {
appKey, shutdown := ctrl.appQueue.Get()
defer ctrl.appQueue.Done(appKey)
if shutdown {
return false
}
return true
}

func (ctrl *ApplicationController) runWorker() {
for ctrl.processNextItem() {
}
}

func newApplicationInformer(appclientset appclientset.Interface, appQueue workqueue.RateLimitingInterface) cache.SharedIndexInformer {
appInformerFactory := appinformers.NewSharedInformerFactory(
appclientset,
appResyncPeriod,
)
informer := appInformerFactory.Argoproj().V1alpha1().Applications().Informer()
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
appQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
appQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
appQueue.Add(key)
}
},
},
)
return informer
}
57 changes: 57 additions & 0 deletions cmd/argocd/application-controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"fmt"
"github.com/argoproj/argo-cd/application/controller"
"github.com/argoproj/argo-cd/cmd/argocd/commands"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"os"
)

const (
// CLIName is the name of the CLI
cliName = "application-controller"
)

func newCommand() *cobra.Command {
var (
kubeConfigOverrides clientcmd.ConfigOverrides
kubeConfigPath string
)
var command = cobra.Command{
Use: cliName,
Short: "application-controller is a controller to operate on applications CRD",
Run: func(c *cobra.Command, args []string) {
kubeConfig := commands.GetKubeConfig(kubeConfigPath, kubeConfigOverrides)

kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
appClient := appclientset.NewForConfigOrDie(kubeConfig)

appController := controller.NewApplicationController(kubeClient, appClient)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go appController.Run(ctx, 1)
// Wait forever
select {}
},
}

command.Flags().StringVar(&kubeConfigPath, "kubeconfig", "", "Path to the config file to use for CLI requests.")
kubeConfigOverrides = clientcmd.ConfigOverrides{}
clientcmd.BindOverrideFlags(&kubeConfigOverrides, command.Flags(), clientcmd.RecommendedConfigOverrideFlags(""))

return &command
}

func main() {
if err := newCommand().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
8 changes: 5 additions & 3 deletions cmd/argocd/commands/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var (
imageTag = "latest"
)

func getKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest.Config {
// GetKubeClient creates new kubernetes client config using specified config path and config overrides variables
func GetKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest.Config {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = configPath
clientConfig := clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
Expand All @@ -33,8 +34,9 @@ func getKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest
return restConfig
}

func getKubeClient(configPath string, overrides clientcmd.ConfigOverrides) *kubernetes.Clientset {
restConfig := getKubeConfig(configPath, overrides)
// GetKubeClient creates new kubernetes client using specified config path and config overrides variables
func GetKubeClient(configPath string, overrides clientcmd.ConfigOverrides) *kubernetes.Clientset {
restConfig := GetKubeConfig(configPath, overrides)
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatal(err)
Expand Down
18 changes: 8 additions & 10 deletions cmd/argocd/commands/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

// InstallFlags has all the required parameters for installing Argo CD.
Expand All @@ -32,18 +31,17 @@ func NewInstallCommand(globalArgs *globalFlags) *cobra.Command {
Short: "Install the argocd components",
Long: "Install the argocd components",
Run: func(c *cobra.Command, args []string) {
client := getKubeClient(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides)
extensionsClient := apiextensionsclient.NewForConfigOrDie(getKubeConfig(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides))
installAppCRD(client, extensionsClient, installArgs)
installClusterCRD(client, extensionsClient, installArgs)
extensionsClient := apiextensionsclient.NewForConfigOrDie(GetKubeConfig(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides))
installAppCRD(extensionsClient, installArgs)
installClusterCRD(extensionsClient, installArgs)
},
}
command.Flags().BoolVar(&installArgs.DryRun, "dry-run", false, "print the kubernetes manifests to stdout instead of installing")

return command
}

func installAppCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
func installAppCRD(extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
applicationCRD := apiextensionsv1beta1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1alpha1",
Expand All @@ -63,10 +61,10 @@ func installAppCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensi
},
},
}
createCRDHelper(clientset, extensionsClient, &applicationCRD, args.DryRun)
createCRDHelper(extensionsClient, &applicationCRD, args.DryRun)
}

func installClusterCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
func installClusterCRD(extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
clusterCRD := apiextensionsv1beta1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1alpha1",
Expand All @@ -86,10 +84,10 @@ func installClusterCRD(clientset *kubernetes.Clientset, extensionsClient *apiext
},
},
}
createCRDHelper(clientset, extensionsClient, &clusterCRD, args.DryRun)
createCRDHelper(extensionsClient, &clusterCRD, args.DryRun)
}

func createCRDHelper(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, crd *apiextensionsv1beta1.CustomResourceDefinition, dryRun bool) {
func createCRDHelper(extensionsClient *apiextensionsclient.Clientset, crd *apiextensionsv1beta1.CustomResourceDefinition, dryRun bool) {
if dryRun {
printYAML(crd)
return
Expand Down

0 comments on commit f9bc9bd

Please sign in to comment.