Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: job metrics owner ref when using custom job kubeconfig/ns #3425

Merged
merged 7 commits into from
Mar 15, 2024
Merged
29 changes: 25 additions & 4 deletions analysis/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

"github.com/argoproj/argo-rollouts/metric"
jobProvider "github.com/argoproj/argo-rollouts/metricproviders/job"
"github.com/aws/smithy-go/ptr"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"

Expand All @@ -32,6 +35,10 @@
timeutil "github.com/argoproj/argo-rollouts/utils/time"
)

var (
analysisRunGVK = v1alpha1.SchemeGroupVersion.WithKind("AnalysisRun")
)

// Controller is the controller implementation for Analysis resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
Expand Down Expand Up @@ -187,18 +194,32 @@
return c.persistAnalysisRunStatus(run, newRun.Status)
}

func (c *Controller) jobParentNamespace(obj any) string {
func (c *Controller) jobParentReference(obj any) (*v1.OwnerReference, string) {

Check warning on line 197 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L197

Added line #L197 was not covered by tests
job, ok := obj.(*batchv1.Job)
if !ok {
return ""
return nil, ""

Check warning on line 200 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L200

Added line #L200 was not covered by tests
}
// if it has owner reference, return it as is
ownerRef := v1.GetControllerOf(job)

Check warning on line 203 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L203

Added line #L203 was not covered by tests
// else if it's missing owner reference check if analysis run uid is set and
// if it is there use labels/annotations to create owner reference
if ownerRef == nil && job.Labels[jobProvider.AnalysisRunUIDLabelKey] != "" {
ownerRef = &v1.OwnerReference{
APIVersion: analysisRunGVK.GroupVersion().String(),
Kind: analysisRunGVK.Kind,
Name: job.Annotations[jobProvider.AnalysisRunNameAnnotationKey],
UID: types.UID(job.Labels[jobProvider.AnalysisRunUIDLabelKey]),
BlockOwnerDeletion: ptr.Bool(true),
Controller: ptr.Bool(true),

Check warning on line 213 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L206-L213

Added lines #L206 - L213 were not covered by tests
}
}
ns := job.GetNamespace()
if job.Annotations != nil {
if job.Annotations[jobProvider.AnalysisRunNamespaceAnnotationKey] != "" {
ns = job.Annotations[jobProvider.AnalysisRunNamespaceAnnotationKey]
}
}
return ns
return ownerRef, ns

Check warning on line 222 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L222

Added line #L222 was not covered by tests
}

func (c *Controller) enqueueJobIfCompleted(obj any) {
Expand All @@ -209,7 +230,7 @@
for _, condition := range job.Status.Conditions {
switch condition.Type {
case batchv1.JobFailed, batchv1.JobComplete:
controllerutil.EnqueueParentObject(job, register.AnalysisRunKind, c.enqueueAnalysis, c.jobParentNamespace)
controllerutil.EnqueueParentObject(job, register.AnalysisRunKind, c.enqueueAnalysis, c.jobParentReference)

Check warning on line 233 in analysis/controller.go

View check run for this annotation

Codecov / codecov/patch

analysis/controller.go#L233

Added line #L233 was not covered by tests
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newCommand() *cobra.Command {
instanceIDTweakListFunc := func(options *metav1.ListOptions) {
options.LabelSelector = instanceIDSelector.String()
}
jobKubeClient, err := metricproviders.GetAnalysisJobClientset(kubeClient)
jobKubeClient, _, err := metricproviders.GetAnalysisJobClientset(kubeClient)
checkError(err)
jobNs := metricproviders.GetAnalysisJobNamespace()
if jobNs == "" {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.30.3
github.com/aws/smithy-go v1.20.1
github.com/blang/semver v3.5.1+incompatible
github.com/bombsimon/logrusr/v4 v4.1.0
github.com/evanphx/json-patch/v5 v5.9.0
Expand Down Expand Up @@ -88,7 +89,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
45 changes: 31 additions & 14 deletions metricproviders/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@
)

type JobProvider struct {
kubeclientset kubernetes.Interface
jobLister batchlisters.JobLister
logCtx log.Entry
jobNamespace string
kubeclientset kubernetes.Interface
jobLister batchlisters.JobLister
logCtx log.Entry
jobNamespace string
customJobKubeconfig bool
}

func NewJobProvider(logCtx log.Entry, kubeclientset kubernetes.Interface, jobLister batchlisters.JobLister, jobNS string) *JobProvider {
func NewJobProvider(logCtx log.Entry, kubeclientset kubernetes.Interface, jobLister batchlisters.JobLister, jobNS string, customJobKubeconfig bool) *JobProvider {
return &JobProvider{
kubeclientset: kubeclientset,
logCtx: logCtx,
jobLister: jobLister,
jobNamespace: jobNS,
kubeclientset: kubeclientset,
logCtx: logCtx,
jobLister: jobLister,
jobNamespace: jobNS,
customJobKubeconfig: customJobKubeconfig,
}
}

Expand Down Expand Up @@ -85,7 +87,7 @@
return int(res.Count + res.Error + 1)
}

func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, jobNS string) (*batchv1.Job, error) {
func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, jobNS string, customJobKubeconfig bool) (*batchv1.Job, error) {
ns := run.Namespace
if jobNS != "" {
ns = jobNS
Expand All @@ -102,11 +104,17 @@
jobAnnotations[AnalysisRunNameAnnotationKey] = run.Name
jobAnnotations[AnalysisRunNamespaceAnnotationKey] = run.Namespace
jobAnnotations[AnalysisRunMetricAnnotationKey] = metric.Name

ownerRef := []metav1.OwnerReference{*metav1.NewControllerRef(run, analysisRunGVK)}

if ns != run.Namespace || customJobKubeconfig {
ownerRef = nil

Check warning on line 111 in metricproviders/job/job.go

View check run for this annotation

Codecov / codecov/patch

metricproviders/job/job.go#L111

Added line #L111 was not covered by tests
}
job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: newJobName(run, metric),
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(run, analysisRunGVK)},
OwnerReferences: ownerRef,
Annotations: jobAnnotations,
Labels: jobLabels,
},
Expand All @@ -122,7 +130,7 @@
StartedAt: &now,
Phase: v1alpha1.AnalysisPhaseRunning,
}
job, err := newMetricJob(run, metric, p.jobNamespace)
job, err := newMetricJob(run, metric, p.jobNamespace, p.customJobKubeconfig)
if err != nil {
p.logCtx.Errorf("job initialization failed: %v", err)
return metricutil.MarkMeasurementError(measurement, err)
Expand All @@ -139,8 +147,17 @@
p.logCtx.Errorf("job create (verify) %s failed: %v", job.Name, createErr)
return metricutil.MarkMeasurementError(measurement, createErr)
}
controllerRef := metav1.GetControllerOf(existingJob)
if run.UID != controllerRef.UID {
ownerUID := ""
// if custom kubeconfig or different namespace is used owner ref is absent,
// use run uid label to get owner analysis run uid
if p.customJobKubeconfig || job.Namespace != run.Namespace {
ownerUID = job.Labels[AnalysisRunUIDLabelKey]

Check warning on line 154 in metricproviders/job/job.go

View check run for this annotation

Codecov / codecov/patch

metricproviders/job/job.go#L154

Added line #L154 was not covered by tests
} else {
controllerRef := metav1.GetControllerOf(existingJob)
ownerUID = string(controllerRef.UID)
}

if string(run.UID) != ownerUID {
// NOTE: we don't bother to check for semantic equality. UID is good enough
p.logCtx.Errorf("job create (uid check) %s failed: %v", job.Name, createErr)
return metricutil.MarkMeasurementError(measurement, createErr)
Expand Down
4 changes: 2 additions & 2 deletions metricproviders/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newTestJobProvider(objects ...runtime.Object) *JobProvider {
cancel()

jobLister := k8sI.Batch().V1().Jobs().Lister()
return NewJobProvider(*logCtx, kubeclient, jobLister, "")
return NewJobProvider(*logCtx, kubeclient, jobLister, "", false)
}

func newRunWithJobMetric() *v1alpha1.AnalysisRun {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestRunCreateCollision(t *testing.T) {
p := newTestJobProvider()
run := newRunWithJobMetric()

existingJob, err := newMetricJob(run, run.Spec.Metrics[0], p.jobNamespace)
existingJob, err := newMetricJob(run, run.Spec.Metrics[0], p.jobNamespace, p.customJobKubeconfig)
assert.NoError(t, err)
fakeClient := p.kubeclientset.(*k8sfake.Clientset)
fakeClient.Tracker().Add(existingJob)
Expand Down
13 changes: 7 additions & 6 deletions metricproviders/metricproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func (f *ProviderFactory) NewProvider(logCtx log.Entry, metric v1alpha1.Metric)
}
return prometheus.NewPrometheusProvider(api, logCtx, metric)
case job.ProviderType:
kubeClient, err := GetAnalysisJobClientset(f.KubeClient)
kubeClient, customKubeconfig, err := GetAnalysisJobClientset(f.KubeClient)
if err != nil {
return nil, err
}

return job.NewJobProvider(logCtx, kubeClient, f.JobLister, GetAnalysisJobNamespace()), nil
return job.NewJobProvider(logCtx, kubeClient, f.JobLister, GetAnalysisJobNamespace(), customKubeconfig), nil
case kayenta.ProviderType:
c := kayenta.NewHttpClient()
return kayenta.NewKayentaProvider(logCtx, c), nil
Expand Down Expand Up @@ -154,7 +154,7 @@ func Type(metric v1alpha1.Metric) string {
// if the AnalysisJobKubeconfigEnv is set to InclusterKubeconfig, it will return the incluster client
// else if it's set to a kubeconfig file it will return the clientset corresponding to the kubeconfig file.
// If empty it returns the provided defaultClientset
func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.Interface, error) {
func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.Interface, bool, error) {
customJobKubeconfig := os.Getenv(AnalysisJobKubeconfigEnv)
if customJobKubeconfig != "" {
var (
Expand All @@ -167,11 +167,12 @@ func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.
cfg, err = clientcmd.BuildConfigFromFlags("", customJobKubeconfig)
}
if err != nil {
return nil, err
return nil, true, err
}
return kubernetes.NewForConfig(cfg)
clientSet, err := kubernetes.NewForConfig(cfg)
return clientSet, true, err
}
return defaultClientset, nil
return defaultClientset, false, nil
}

func GetAnalysisJobNamespace() string {
Expand Down
23 changes: 16 additions & 7 deletions utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@
// It then enqueues that ownerType resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
// This function assumes parent object is in the same namespace as the child
func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any), parentNamespaceGetter ...func(any) string) {
func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any), parentGetter ...func(any) (*metav1.OwnerReference, string)) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
Expand All @@ -239,16 +239,25 @@
log.Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}

if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
var (
ownerRef *metav1.OwnerReference
namespace string
)

if len(parentGetter) > 0 {
ownerRef, namespace = parentGetter[0](obj)

Check warning on line 248 in utils/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

utils/controller/controller.go#L248

Added line #L248 was not covered by tests
} else {
ownerRef = metav1.GetControllerOf(object)
}

if ownerRef != nil {
// If this object is not owned by the ownerType, we should not do anything more with it.
if ownerRef.Kind != ownerType {
return
}
namespace := object.GetNamespace()
if len(parentNamespaceGetter) > 0 {
// If the parentNamespaceGetter is provided, use it to get the parent namespace
// only uses the first parentNamespaceGetter func
namespace = parentNamespaceGetter[0](obj)
// if namespace not set by parentGetter use object namespace
if namespace == "" {
namespace = object.GetNamespace()
}
parent := cache.ExplicitKey(namespace + "/" + ownerRef.Name)
log.Infof("Enqueueing parent of %s/%s: %s %s", namespace, object.GetName(), ownerRef.Kind, parent)
Expand Down
Loading