Skip to content

Commit

Permalink
wip: source metrics [cleanup]
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Nov 6, 2024
1 parent 9c1d3ce commit 16db48b
Show file tree
Hide file tree
Showing 12 changed files with 2 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ coverage:
project:
default:
# allow test coverage to drop by 2%, assume that it's typically due to CI problems
threshold: 2
threshold: "2"
3 changes: 1 addition & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const (
LabelVertexType = "vertex_type"
LabelPartitionName = "partition_name"
LabelMonoVertexName = "mvtx_name"

LabelReason = "reason"
LabelReason = "reason"
)

// Generic forwarder metrics
Expand Down
38 changes: 0 additions & 38 deletions pkg/sources/generator/metrics.go

This file was deleted.

3 changes: 0 additions & 3 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sources/sourcer"
)
Expand Down Expand Up @@ -199,7 +198,6 @@ loop:
// we implement Read With Wait semantics
select {
case r := <-mg.srcChan:
tickgenSourceReadCount.With(map[string]string{metrics.LabelVertex: mg.vertexName, metrics.LabelPipeline: mg.pipelineName}).Inc()
msgs = append(msgs, mg.newReadMessage(r.key, r.data, r.offset, r.ts))
case <-timeout:
break loop
Expand Down Expand Up @@ -240,7 +238,6 @@ func (mg *memGen) newWorker(ctx context.Context, rate int) func(chan time.Time,
case <-ctx.Done():
return
case ts := <-tickChan:
tickgenSourceCount.With(map[string]string{metrics.LabelVertex: mg.vertexName, metrics.LabelPipeline: mg.pipelineName}).Inc()
// we would generate all the keys in a round robin fashion
// even if there are multiple pods, all the pods will generate same keys in the same order.
// TODO: alternatively, we could also think about generating a subset of keys per pod.
Expand Down
2 changes: 0 additions & 2 deletions pkg/sources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedtls "github.com/numaproj/numaflow/pkg/shared/tls"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
Expand Down Expand Up @@ -211,7 +210,6 @@ loop:
for i := int64(0); i < count; i++ {
select {
case m := <-h.messages:
httpSourceReadCount.With(map[string]string{metrics.LabelVertex: h.vertexName, metrics.LabelPipeline: h.pipelineName}).Inc()
msgs = append(msgs, m)
case <-timeout:
h.logger.Debugw("Timed out waiting for messages to read.", zap.Duration("waited", h.readTimeout), zap.Int("read", len(msgs)))
Expand Down
31 changes: 0 additions & 31 deletions pkg/sources/http/metrics.go

This file was deleted.

2 changes: 0 additions & 2 deletions pkg/sources/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/sources/sourcer"
Expand Down Expand Up @@ -401,7 +400,6 @@ loop:
for i := int64(0); i < count; i++ {
select {
case m := <-ns.messages:
jetstreamSourceReadCount.With(map[string]string{metrics.LabelVertex: ns.vertexName, metrics.LabelPipeline: ns.pipelineName}).Inc()
msgs = append(msgs, m)
case <-timeout:
ns.logger.Debugw("Timed out waiting for messages to read.", zap.Duration("waited", ns.readTimeout), zap.Int("read", len(msgs)))
Expand Down
31 changes: 0 additions & 31 deletions pkg/sources/jetstream/metrics.go

This file was deleted.

7 changes: 0 additions & 7 deletions pkg/sources/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ import (
"github.com/numaproj/numaflow/pkg/metrics"
)

// kafkaSourceReadCount is used to indicate the number of messages read
var kafkaSourceReadCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "kafka_source",
Name: "read_total",
Help: "Total number of messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// kafkaSourceAckCount is used to indicate the number of messages Acknowledged
var kafkaSourceAckCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "kafka_source",
Expand Down
6 changes: 0 additions & 6 deletions pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -159,11 +158,6 @@ loop:
for i := int64(0); i < count; i++ {
select {
case m := <-ks.handler.messages:
kafkaSourceReadCount.With(map[string]string{
metrics.LabelVertex: ks.vertexName,
metrics.LabelPipeline: ks.pipelineName,
metrics.LabelPartitionName: strconv.Itoa(int(m.Partition)),
}).Inc()
msgs = append(msgs, ks.toReadMessage(m))
case <-timeout:
// log that timeout has happened and don't return an error
Expand Down
31 changes: 0 additions & 31 deletions pkg/sources/nats/metrics.go

This file was deleted.

2 changes: 0 additions & 2 deletions pkg/sources/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/sources/sourcer"
Expand Down Expand Up @@ -192,7 +191,6 @@ loop:
for i := int64(0); i < count; i++ {
select {
case m := <-ns.messages:
natsSourceReadCount.With(map[string]string{metrics.LabelVertex: ns.vertexName, metrics.LabelPipeline: ns.pipelineName}).Inc()
msgs = append(msgs, m)
case <-timeout:
ns.logger.Debugw("Timed out waiting for messages to read.", zap.Duration("waited", ns.readTimeout), zap.Int("read", len(msgs)))
Expand Down

0 comments on commit 16db48b

Please sign in to comment.