Skip to content

Commit

Permalink
Cleanup: remove potential goroutine leakages in metrics recording
Browse files Browse the repository at this point in the history
When we try to record metrics in a forked goroutine, we should inherit
the context from upstream to prevent potential goroutine leakages.

Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet authored and tekton-robot committed Jun 6, 2022
1 parent e2e2330 commit c671899
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ func (c *Reconciler) durationAndCountMetrics(ctx context.Context, tr *v1beta1.Ta
}
before := newTr.Status.GetCondition(apis.ConditionSucceeded)
go func(metrics *taskrunmetrics.Recorder) {
if err := metrics.DurationAndCount(tr, before); err != nil {
if err := metrics.DurationAndCount(ctx, tr, before); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
if err := metrics.CloudEvents(tr); err != nil {
if err := metrics.CloudEvents(ctx, tr); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
Expand Down Expand Up @@ -460,7 +460,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
if err := podconvert.UpdateReady(ctx, c.KubeClientSet, *pod); err != nil {
return err
}
if err := c.metrics.RecordPodLatency(pod, tr); err != nil {
if err := c.metrics.RecordPodLatency(ctx, pod, tr); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
Expand Down
24 changes: 11 additions & 13 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func nilInsertTag(task, taskrun string) []tag.Mutator {
// DurationAndCount logs the duration of TaskRun execution and
// count for number of TaskRuns succeed or failed
// returns an error if its failed to log the metrics
func (r *Recorder) DurationAndCount(tr *v1beta1.TaskRun, beforeCondition *apis.Condition) error {
func (r *Recorder) DurationAndCount(ctx context.Context, tr *v1beta1.TaskRun, beforeCondition *apis.Condition) error {

if !r.initialized {
return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", tr.Name)
Expand Down Expand Up @@ -310,7 +310,7 @@ func (r *Recorder) DurationAndCount(tr *v1beta1.TaskRun, beforeCondition *apis.C

if ok, pipeline, pipelinerun := tr.IsPartOfPipeline(); ok {
ctx, err := tag.New(
context.Background(),
ctx,
append([]tag.Mutator{tag.Insert(namespaceTag, tr.Namespace),
tag.Insert(statusTag, status)},
append(r.insertPipelineTag(pipeline, pipelinerun),
Expand All @@ -326,7 +326,7 @@ func (r *Recorder) DurationAndCount(tr *v1beta1.TaskRun, beforeCondition *apis.C
}

ctx, err := tag.New(
context.Background(),
ctx,
append([]tag.Mutator{tag.Insert(namespaceTag, tr.Namespace),
tag.Insert(statusTag, status)},
r.insertTaskTag(taskName, tr.Name)...)...)
Expand All @@ -342,7 +342,7 @@ func (r *Recorder) DurationAndCount(tr *v1beta1.TaskRun, beforeCondition *apis.C

// RunningTaskRuns logs the number of TaskRuns running right now
// returns an error if its failed to log the metrics
func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error {
func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLister) error {
r.mutex.Lock()
defer r.mutex.Unlock()

Expand All @@ -362,9 +362,7 @@ func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error {
}
}

ctx, err := tag.New(
context.Background(),
)
ctx, err = tag.New(ctx)
if err != nil {
return err
}
Expand All @@ -385,7 +383,7 @@ func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.Tas

case <-time.After(r.ReportingPeriod):
// Every 30s surface a metric for the number of running tasks.
if err := r.RunningTaskRuns(lister); err != nil {
if err := r.RunningTaskRuns(ctx, lister); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
Expand All @@ -394,7 +392,7 @@ func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.Tas

// RecordPodLatency logs the duration required to schedule the pod for TaskRun
// returns an error if its failed to log the metrics
func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error {
func (r *Recorder) RecordPodLatency(ctx context.Context, pod *corev1.Pod, tr *v1beta1.TaskRun) error {
r.mutex.Lock()
defer r.mutex.Unlock()

Expand All @@ -414,7 +412,7 @@ func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error
}

ctx, err := tag.New(
context.Background(),
ctx,
append([]tag.Mutator{tag.Insert(namespaceTag, tr.Namespace),
tag.Insert(podTag, pod.Name)},
r.insertTaskTag(taskName, tr.Name)...)...)
Expand All @@ -429,7 +427,7 @@ func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error

// CloudEvents logs the number of cloud events sent for TaskRun
// returns an error if it fails to log the metrics
func (r *Recorder) CloudEvents(tr *v1beta1.TaskRun) error {
func (r *Recorder) CloudEvents(ctx context.Context, tr *v1beta1.TaskRun) error {
r.mutex.Lock()
defer r.mutex.Unlock()

Expand All @@ -449,7 +447,7 @@ func (r *Recorder) CloudEvents(tr *v1beta1.TaskRun) error {

if ok, pipeline, pipelinerun := tr.IsPartOfPipeline(); ok {
ctx, err := tag.New(
context.Background(),
ctx,
append([]tag.Mutator{tag.Insert(namespaceTag, tr.Namespace),
tag.Insert(statusTag, status)},
append(r.insertPipelineTag(pipeline, pipelinerun),
Expand All @@ -462,7 +460,7 @@ func (r *Recorder) CloudEvents(tr *v1beta1.TaskRun) error {
}

ctx, err := tag.New(
context.Background(),
ctx,
append([]tag.Mutator{tag.Insert(namespaceTag, tr.Namespace),
tag.Insert(statusTag, status)},
r.insertTaskTag(taskName, tr.Name)...)...)
Expand Down
18 changes: 10 additions & 8 deletions pkg/taskrunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,18 @@ func TestUninitializedMetrics(t *testing.T) {
Status: corev1.ConditionUnknown,
}

if err := metrics.DurationAndCount(&v1beta1.TaskRun{}, beforeCondition); err == nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := metrics.DurationAndCount(ctx, &v1beta1.TaskRun{}, beforeCondition); err == nil {
t.Error("DurationCount recording expected to return error but got nil")
}
if err := metrics.RunningTaskRuns(nil); err == nil {
if err := metrics.RunningTaskRuns(ctx, nil); err == nil {
t.Error("Current TaskRunsCount recording expected to return error but got nil")
}
if err := metrics.RecordPodLatency(nil, nil); err == nil {
if err := metrics.RecordPodLatency(ctx, nil, nil); err == nil {
t.Error("Pod Latency recording expected to return error but got nil")
}
if err := metrics.CloudEvents(&v1beta1.TaskRun{}); err == nil {
if err := metrics.CloudEvents(ctx, &v1beta1.TaskRun{}); err == nil {
t.Error("Cloud Events recording expected to return error but got nil")
}
}
Expand Down Expand Up @@ -361,7 +363,7 @@ func TestRecordTaskRunDurationCount(t *testing.T) {
t.Fatalf("NewRecorder: %v", err)
}

if err := metrics.DurationAndCount(c.taskRun, c.beforeCondition); err != nil {
if err := metrics.DurationAndCount(ctx, c.taskRun, c.beforeCondition); err != nil {
t.Errorf("DurationAndCount: %v", err)
}
if c.expectedCountTags != nil {
Expand Down Expand Up @@ -414,7 +416,7 @@ func TestRecordRunningTaskRunsCount(t *testing.T) {
t.Fatalf("NewRecorder: %v", err)
}

if err := metrics.RunningTaskRuns(informer.Lister()); err != nil {
if err := metrics.RunningTaskRuns(ctx, informer.Lister()); err != nil {
t.Errorf("RunningTaskRuns: %v", err)
}
metricstest.CheckLastValueData(t, "running_taskruns_count", map[string]string{}, 1)
Expand Down Expand Up @@ -478,7 +480,7 @@ func TestRecordPodLatency(t *testing.T) {
t.Fatalf("NewRecorder: %v", err)
}

if err := metrics.RecordPodLatency(td.pod, taskRun); td.expectingError && err == nil {
if err := metrics.RecordPodLatency(ctx, td.pod, taskRun); td.expectingError && err == nil {
t.Error("RecordPodLatency wanted error, got nil")
} else if !td.expectingError {
if err != nil {
Expand Down Expand Up @@ -635,7 +637,7 @@ func TestRecordCloudEvents(t *testing.T) {
t.Fatalf("NewRecorder: %v", err)
}

if err := metrics.CloudEvents(c.taskRun); err != nil {
if err := metrics.CloudEvents(ctx, c.taskRun); err != nil {
t.Fatalf("CloudEvents: %v", err)
}
metricstest.CheckSumData(t, "cloudevent_count", c.expectedTags, c.expectedCount)
Expand Down

0 comments on commit c671899

Please sign in to comment.