Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow analysis run to use separate kubeconfig for jobs #3350

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions analysis/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"time"

"github.com/argoproj/argo-rollouts/metric"
jobProvider "github.com/argoproj/argo-rollouts/metricproviders/job"

unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"

Expand Down Expand Up @@ -106,13 +107,13 @@

cfg.JobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
controller.enqueueIfCompleted(obj)
controller.enqueueJobIfCompleted(obj)

Check warning on line 110 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L110

Added line #L110 was not covered by tests
},
UpdateFunc: func(oldObj, newObj any) {
controller.enqueueIfCompleted(newObj)
controller.enqueueJobIfCompleted(newObj)

Check warning on line 113 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L113

Added line #L113 was not covered by tests
},
DeleteFunc: func(obj any) {
controller.enqueueIfCompleted(obj)
controller.enqueueJobIfCompleted(obj)

Check warning on line 116 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L116

Added line #L116 was not covered by tests
},
})

Expand Down Expand Up @@ -186,15 +187,29 @@
return c.persistAnalysisRunStatus(run, newRun.Status)
}

func (c *Controller) enqueueIfCompleted(obj any) {
func (c *Controller) jobParentNamespace(obj any) string {
job, ok := obj.(*batchv1.Job)
if !ok {
return ""

Check warning on line 193 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L190-L193

Added lines #L190 - L193 were not covered by tests
}
ns := job.GetNamespace()
if job.Annotations != nil {
if job.Annotations[jobProvider.AnalysisRunNamespaceAnnotationKey] != "" {
ns = job.Annotations[jobProvider.AnalysisRunNamespaceAnnotationKey]

Check warning on line 198 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L195-L198

Added lines #L195 - L198 were not covered by tests
}
}
return ns

Check warning on line 201 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L201

Added line #L201 was not covered by tests
}

func (c *Controller) enqueueJobIfCompleted(obj any) {

Check warning on line 204 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L204

Added line #L204 was not covered by tests
job, ok := obj.(*batchv1.Job)
if !ok {
return
}
for _, condition := range job.Status.Conditions {
switch condition.Type {
case batchv1.JobFailed, batchv1.JobComplete:
controllerutil.EnqueueParentObject(job, register.AnalysisRunKind, c.enqueueAnalysis)
controllerutil.EnqueueParentObject(job, register.AnalysisRunKind, c.enqueueAnalysis, c.jobParentNamespace)

Check warning on line 212 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L212

Added line #L212 was not covered by tests
return
}
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/argoproj/argo-rollouts/metricproviders"
"github.com/argoproj/argo-rollouts/utils/record"
"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
Expand Down Expand Up @@ -127,6 +128,7 @@ func newCommand() *cobra.Command {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
checkError(err)
smiClient, err := smiclientset.NewForConfig(config)
checkError(err)
resyncDuration := time.Duration(rolloutResyncPeriod) * time.Second
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
Expand All @@ -136,10 +138,17 @@ func newCommand() *cobra.Command {
instanceIDTweakListFunc := func(options *metav1.ListOptions) {
options.LabelSelector = instanceIDSelector.String()
}
jobKubeClient, err := metricproviders.GetAnalysisJobClientset(kubeClient)
checkError(err)
jobNs := metricproviders.GetAnalysisJobNamespace()
if jobNs == "" {
// if not set explicitly use the configured ns
jobNs = namespace
}
jobInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
jobKubeClient,
resyncDuration,
kubeinformers.WithNamespace(namespace),
kubeinformers.WithNamespace(jobNs),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = jobprovider.AnalysisRunUIDLabelKey
}))
Expand Down
60 changes: 43 additions & 17 deletions metricproviders/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
batchlisters "k8s.io/client-go/listers/batch/v1"

Expand All @@ -24,8 +25,12 @@
ProviderType = "Job"
// JobNameKey is the measurement's metadata key holding the job name associated with the measurement
JobNameKey = "job-name"
// JobNamespaceKey is the measurement's metadata key holding the job namespace associated with the measurement
JobNamespaceKey = "job-namespace"
// AnalysisRunNameAnnotationKey is the job's annotation key containing the name of the controller AnalysisRun
AnalysisRunNameAnnotationKey = "analysisrun.argoproj.io/name"
// AnalysisRunNamespaceAnnotationKey is the job's annotation key containing the namespace of the controller AnalysisRun
AnalysisRunNamespaceAnnotationKey = "analysisrun.argoproj.io/namespace"
// AnalysisRunMetricLabelKey is the job's annotation key containing the name of the associated AnalysisRun metric
AnalysisRunMetricAnnotationKey = "analysisrun.argoproj.io/metric-name"
// AnalysisRunUIDLabelKey is the job's label key containing the uid of the associated AnalysisRun
Expand All @@ -41,13 +46,15 @@
kubeclientset kubernetes.Interface
jobLister batchlisters.JobLister
logCtx log.Entry
jobNamespace string
}

func NewJobProvider(logCtx log.Entry, kubeclientset kubernetes.Interface, jobLister batchlisters.JobLister) *JobProvider {
func NewJobProvider(logCtx log.Entry, kubeclientset kubernetes.Interface, jobLister batchlisters.JobLister, jobNS string) *JobProvider {
return &JobProvider{
kubeclientset: kubeclientset,
logCtx: logCtx,
jobLister: jobLister,
jobNamespace: jobNS,
}
}

Expand Down Expand Up @@ -78,7 +85,11 @@
return int(res.Count + res.Error + 1)
}

func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) (*batchv1.Job, error) {
func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, jobNS string) (*batchv1.Job, error) {
ns := run.Namespace
if jobNS != "" {
ns = jobNS

Check warning on line 91 in metricproviders/job/job.go

View check run for this annotation

Codecov / codecov/patch

metricproviders/job/job.go#L91

Added line #L91 was not covered by tests
}
jobAnnotations := metric.Provider.Job.Metadata.GetAnnotations()
jobLabels := metric.Provider.Job.Metadata.GetLabels()
if jobAnnotations == nil {
Expand All @@ -89,11 +100,12 @@
}
jobLabels[AnalysisRunUIDLabelKey] = string(run.UID)
jobAnnotations[AnalysisRunNameAnnotationKey] = run.Name
jobAnnotations[AnalysisRunNamespaceAnnotationKey] = run.Namespace
jobAnnotations[AnalysisRunMetricAnnotationKey] = metric.Name
job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: newJobName(run, metric),
Namespace: run.Namespace,
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(run, analysisRunGVK)},
Annotations: jobAnnotations,
Labels: jobLabels,
Expand All @@ -110,12 +122,12 @@
StartedAt: &now,
Phase: v1alpha1.AnalysisPhaseRunning,
}
job, err := newMetricJob(run, metric)
job, err := newMetricJob(run, metric, p.jobNamespace)
if err != nil {
p.logCtx.Errorf("job initialization failed: %v", err)
return metricutil.MarkMeasurementError(measurement, err)
}
jobIf := p.kubeclientset.BatchV1().Jobs(run.Namespace)
jobIf := p.kubeclientset.BatchV1().Jobs(job.Namespace)
createdJob, createErr := jobIf.Create(ctx, job, metav1.CreateOptions{})
if createErr != nil {
if !k8serrors.IsAlreadyExists(createErr) {
Expand All @@ -137,19 +149,20 @@
createdJob = existingJob
}
measurement.Metadata = map[string]string{
JobNameKey: createdJob.Name,
JobNameKey: createdJob.Name,
JobNamespaceKey: createdJob.Namespace,
}
p.logCtx.Infof("job %s/%s created", createdJob.Namespace, createdJob.Name)
return measurement
}

func (p *JobProvider) Resume(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
jobName, err := getJobName(measurement)
jobName, err := getJobNamespacedName(measurement, run.Namespace)
now := timeutil.MetaNow()
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}
job, err := p.jobLister.Jobs(run.Namespace).Get(jobName)
job, err := p.jobLister.Jobs(jobName.Namespace).Get(jobName.Name)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}
Expand All @@ -170,26 +183,39 @@
}

func (p *JobProvider) Terminate(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
jobName, err := getJobName(measurement)
jobName, err := getJobNamespacedName(measurement, run.Namespace)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}
err = p.deleteJob(run.Namespace, jobName)
err = p.deleteJob(jobName.Namespace, jobName.Name)
if err != nil {
return metricutil.MarkMeasurementError(measurement, err)
}
now := timeutil.MetaNow()
measurement.FinishedAt = &now
measurement.Phase = v1alpha1.AnalysisPhaseSuccessful
p.logCtx.Infof("job %s/%s terminated", run.Namespace, jobName)
p.logCtx.Infof("job %s/%s terminated", jobName.Namespace, jobName.Name)
return measurement
}

func getJobName(measurement v1alpha1.Measurement) (string, error) {
if measurement.Metadata != nil && measurement.Metadata[JobNameKey] != "" {
return measurement.Metadata[JobNameKey], nil
func getJobNamespacedName(measurement v1alpha1.Measurement, defaultNS string) (types.NamespacedName, error) {
name := types.NamespacedName{
Namespace: defaultNS,
Name: "",
}
if measurement.Metadata != nil {
if measurement.Metadata[JobNameKey] != "" {
name.Name = measurement.Metadata[JobNameKey]
} else {
return name, errors.New("job metadata reference missing")

Check warning on line 210 in metricproviders/job/job.go

View check run for this annotation

Codecov / codecov/patch

metricproviders/job/job.go#L210

Added line #L210 was not covered by tests
}
if measurement.Metadata[JobNamespaceKey] != "" {
name.Namespace = measurement.Metadata[JobNamespaceKey]

Check warning on line 213 in metricproviders/job/job.go

View check run for this annotation

Codecov / codecov/patch

metricproviders/job/job.go#L213

Added line #L213 was not covered by tests
}
} else {
return name, errors.New("job metadata reference missing")
}
return "", errors.New("job metadata reference missing")
return name, nil
}

func (p *JobProvider) deleteJob(namespace, jobName string) error {
Expand Down Expand Up @@ -220,11 +246,11 @@
totalJobs := len(jobs)
if totalJobs > limit {
for i := 0; i < totalJobs-limit; i++ {
err = p.deleteJob(run.Namespace, jobs[i].Name)
err = p.deleteJob(jobs[i].Namespace, jobs[i].Name)
if err != nil {
return err
}
p.logCtx.Infof("job %s/%s garbage collected", run.Namespace, jobs[i].Name)
p.logCtx.Infof("job %s/%s garbage collected", jobs[i].Namespace, jobs[i].Name)
}
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions metricproviders/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newTestJobProvider(objects ...runtime.Object) *JobProvider {
cancel()

jobLister := k8sI.Batch().V1().Jobs().Lister()
return NewJobProvider(*logCtx, kubeclient, jobLister)
return NewJobProvider(*logCtx, kubeclient, jobLister, "")
}

func newRunWithJobMetric() *v1alpha1.AnalysisRun {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestRunCreateCollision(t *testing.T) {
p := newTestJobProvider()
run := newRunWithJobMetric()

existingJob, err := newMetricJob(run, run.Spec.Metrics[0])
existingJob, err := newMetricJob(run, run.Spec.Metrics[0], p.jobNamespace)
assert.NoError(t, err)
fakeClient := p.kubeclientset.(*k8sfake.Clientset)
fakeClient.Tracker().Add(existingJob)
Expand Down
44 changes: 43 additions & 1 deletion metricproviders/metricproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package metricproviders

import (
"fmt"
"os"

"github.com/argoproj/argo-rollouts/metric"
"github.com/argoproj/argo-rollouts/metricproviders/influxdb"
"github.com/argoproj/argo-rollouts/metricproviders/skywalking"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo-rollouts/metricproviders/cloudwatch"
"github.com/argoproj/argo-rollouts/metricproviders/datadog"
Expand All @@ -26,6 +29,12 @@ import (
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
)

const (
InclusterKubeconfig = "in-cluster"
AnalysisJobKubeconfigEnv = "ARGO_ROLLOUTS_ANALYSIS_JOB_KUBECONFIG"
AnalysisJobNamespaceEnv = "ARGO_ROLLOUTS_ANALYSIS_JOB_NAMESPACE"
)

type ProviderFactory struct {
KubeClient kubernetes.Interface
JobLister batchlisters.JobLister
Expand All @@ -43,7 +52,12 @@ func (f *ProviderFactory) NewProvider(logCtx log.Entry, metric v1alpha1.Metric)
}
return prometheus.NewPrometheusProvider(api, logCtx, metric)
case job.ProviderType:
return job.NewJobProvider(logCtx, f.KubeClient, f.JobLister), nil
kubeClient, err := GetAnalysisJobClientset(f.KubeClient)
if err != nil {
return nil, err
}

return job.NewJobProvider(logCtx, kubeClient, f.JobLister, GetAnalysisJobNamespace()), nil
case kayenta.ProviderType:
c := kayenta.NewHttpClient()
return kayenta.NewKayentaProvider(logCtx, c), nil
Expand Down Expand Up @@ -135,3 +149,31 @@ func Type(metric v1alpha1.Metric) string {

return "Unknown Provider"
}

// GetAnalysisJobClientset returns kubernetes clientset for executing the analysis job metric,
// if the AnalysisJobKubeconfigEnv is set to InclusterKubeconfig, it will return the incluster client
// else if it's set to a kubeconfig file it will return the clientset corresponding to the kubeconfig file.
// If empty it returns the provided defaultClientset
func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.Interface, error) {
customJobKubeconfig := os.Getenv(AnalysisJobKubeconfigEnv)
if customJobKubeconfig != "" {
var (
cfg *rest.Config
err error
)
if customJobKubeconfig == InclusterKubeconfig {
cfg, err = rest.InClusterConfig()
} else {
cfg, err = clientcmd.BuildConfigFromFlags("", customJobKubeconfig)
}
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(cfg)
}
return defaultClientset, nil
}

func GetAnalysisJobNamespace() string {
return os.Getenv(AnalysisJobNamespaceEnv)
}
7 changes: 6 additions & 1 deletion pkg/kubectl-argo-rollouts/info/analysisrun_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@
for _, measurement := range analysisutil.ArrayMeasurement(run, mr.Name) {
if measurement.Metadata != nil {
if jobName, ok := measurement.Metadata[job.JobNameKey]; ok {
ns := run.Namespace
if jobNamespace, ok := measurement.Metadata[job.JobNamespaceKey]; ok {
ns = jobNamespace

Check warning on line 75 in pkg/kubectl-argo-rollouts/info/analysisrun_info.go

View check run for this annotation

Codecov / codecov/patch

pkg/kubectl-argo-rollouts/info/analysisrun_info.go#L75

Added line #L75 was not covered by tests
}
jobInfo := rollout.JobInfo{
ObjectMeta: &v1.ObjectMeta{
Name: jobName,
Name: jobName,
Namespace: ns,
},
Icon: analysisIcon(measurement.Phase),
Status: string(measurement.Phase),
Expand Down
7 changes: 6 additions & 1 deletion utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@
// It then enqueues that ownerType resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
// This function assumes parent object is in the same namespace as the child
func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any)) {
func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any), parentNamespaceGetter ...func(any) string) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
Expand All @@ -245,6 +245,11 @@
return
}
namespace := object.GetNamespace()
if len(parentNamespaceGetter) > 0 {
// If the parentNamespaceGetter is provided, use it to get the parent namespace
// only uses the first parentNamespaceGetter func
namespace = parentNamespaceGetter[0](obj)

Check warning on line 251 in utils/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

utils/controller/controller.go#L251

Added line #L251 was not covered by tests
}
parent := cache.ExplicitKey(namespace + "/" + ownerRef.Name)
log.Infof("Enqueueing parent of %s/%s: %s %s", namespace, object.GetName(), ownerRef.Kind, parent)
enqueue(parent)
Expand Down
Loading