Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Application controller boilerplate #2

Merged
merged 1 commit into from
Feb 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions application/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package controller

import (
"context"

appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
log "github.com/sirupsen/logrus"

"time"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const (
appResyncPeriod = 10 * time.Minute
)

// ApplicationController is the controller for application resources.
type ApplicationController struct {
kubeclientset kubernetes.Interface
applicationclientset appclientset.Interface
appQueue workqueue.RateLimitingInterface

appInformer cache.SharedIndexInformer
}

// NewApplicationController creates new instance of ApplicationController.
func NewApplicationController(kubeclientset kubernetes.Interface, applicationclientset appclientset.Interface) *ApplicationController {
appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return &ApplicationController{
kubeclientset: kubeclientset,
applicationclientset: applicationclientset,
appQueue: appQueue,
appInformer: newApplicationInformer(applicationclientset, appQueue),
}
}

// Run starts the Application CRD controller.
func (ctrl *ApplicationController) Run(ctx context.Context, appWorkers int) {
defer runtime.HandleCrash()
defer ctrl.appQueue.ShutDown()

go ctrl.appInformer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
}

for i := 0; i < appWorkers; i++ {
go wait.Until(ctrl.runWorker, time.Second, ctx.Done())
}

<-ctx.Done()
}

func (ctrl *ApplicationController) processNextItem() bool {
appKey, shutdown := ctrl.appQueue.Get()
defer ctrl.appQueue.Done(appKey)
if shutdown {
return false
}
return true
}

func (ctrl *ApplicationController) runWorker() {
for ctrl.processNextItem() {
}
}

func newApplicationInformer(appclientset appclientset.Interface, appQueue workqueue.RateLimitingInterface) cache.SharedIndexInformer {
appInformerFactory := appinformers.NewSharedInformerFactory(
appclientset,
appResyncPeriod,
)
informer := appInformerFactory.Argoproj().V1alpha1().Applications().Informer()
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
appQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
appQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
appQueue.Add(key)
}
},
},
)
return informer
}
57 changes: 57 additions & 0 deletions cmd/argocd/application-controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"fmt"
"github.com/argoproj/argo-cd/application/controller"
"github.com/argoproj/argo-cd/cmd/argocd/commands"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"os"
)

const (
// CLIName is the name of the CLI
cliName = "application-controller"
)

func newCommand() *cobra.Command {
var (
kubeConfigOverrides clientcmd.ConfigOverrides
kubeConfigPath string
)
var command = cobra.Command{
Use: cliName,
Short: "application-controller is a controller to operate on applications CRD",
Run: func(c *cobra.Command, args []string) {
kubeConfig := commands.GetKubeConfig(kubeConfigPath, kubeConfigOverrides)

kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
appClient := appclientset.NewForConfigOrDie(kubeConfig)

appController := controller.NewApplicationController(kubeClient, appClient)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go appController.Run(ctx, 1)
// Wait forever
select {}
},
}

command.Flags().StringVar(&kubeConfigPath, "kubeconfig", "", "Path to the config file to use for CLI requests.")
kubeConfigOverrides = clientcmd.ConfigOverrides{}
clientcmd.BindOverrideFlags(&kubeConfigOverrides, command.Flags(), clientcmd.RecommendedConfigOverrideFlags(""))

return &command
}

func main() {
if err := newCommand().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
8 changes: 5 additions & 3 deletions cmd/argocd/commands/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var (
imageTag = "latest"
)

func getKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest.Config {
// GetKubeClient creates new kubernetes client config using specified config path and config overrides variables
func GetKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest.Config {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = configPath
clientConfig := clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
Expand All @@ -33,8 +34,9 @@ func getKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest
return restConfig
}

func getKubeClient(configPath string, overrides clientcmd.ConfigOverrides) *kubernetes.Clientset {
restConfig := getKubeConfig(configPath, overrides)
// GetKubeClient creates new kubernetes client using specified config path and config overrides variables
func GetKubeClient(configPath string, overrides clientcmd.ConfigOverrides) *kubernetes.Clientset {
restConfig := GetKubeConfig(configPath, overrides)
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatal(err)
Expand Down
18 changes: 8 additions & 10 deletions cmd/argocd/commands/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

// InstallFlags has all the required parameters for installing Argo CD.
Expand All @@ -32,18 +31,17 @@ func NewInstallCommand(globalArgs *globalFlags) *cobra.Command {
Short: "Install the argocd components",
Long: "Install the argocd components",
Run: func(c *cobra.Command, args []string) {
client := getKubeClient(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides)
extensionsClient := apiextensionsclient.NewForConfigOrDie(getKubeConfig(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides))
installAppCRD(client, extensionsClient, installArgs)
installClusterCRD(client, extensionsClient, installArgs)
extensionsClient := apiextensionsclient.NewForConfigOrDie(GetKubeConfig(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides))
installAppCRD(extensionsClient, installArgs)
installClusterCRD(extensionsClient, installArgs)
},
}
command.Flags().BoolVar(&installArgs.DryRun, "dry-run", false, "print the kubernetes manifests to stdout instead of installing")

return command
}

func installAppCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
func installAppCRD(extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
applicationCRD := apiextensionsv1beta1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1alpha1",
Expand All @@ -63,10 +61,10 @@ func installAppCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensi
},
},
}
createCRDHelper(clientset, extensionsClient, &applicationCRD, args.DryRun)
createCRDHelper(extensionsClient, &applicationCRD, args.DryRun)
}

func installClusterCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
func installClusterCRD(extensionsClient *apiextensionsclient.Clientset, args InstallFlags) {
clusterCRD := apiextensionsv1beta1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1alpha1",
Expand All @@ -86,10 +84,10 @@ func installClusterCRD(clientset *kubernetes.Clientset, extensionsClient *apiext
},
},
}
createCRDHelper(clientset, extensionsClient, &clusterCRD, args.DryRun)
createCRDHelper(extensionsClient, &clusterCRD, args.DryRun)
}

func createCRDHelper(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, crd *apiextensionsv1beta1.CustomResourceDefinition, dryRun bool) {
func createCRDHelper(extensionsClient *apiextensionsclient.Clientset, crd *apiextensionsv1beta1.CustomResourceDefinition, dryRun bool) {
if dryRun {
printYAML(crd)
return
Expand Down