Skip to content

Commit

Permalink
fix: message count read in forwarder (#1030)
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g authored and whynowy committed Sep 14, 2023
1 parent 01d9abe commit e36b3c6
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 39 deletions.
2 changes: 1 addition & 1 deletion docs/operations/example-dashboard-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"uid": "eF7wTnc4k"
},
"editorMode": "builder",
"expr": "rate(forwarder_read_total{namespace=\"$namespace\", pipeline=\"$pipeline\", vertex=\"$vertex\"}[$__rate_interval])",
"expr": "rate(forwarder_data_read{namespace=\"$namespace\", pipeline=\"$pipeline\", vertex=\"$vertex\"}[$__rate_interval])",
"legendFormat": "__auto",
"range": true,
"refId": "A"
Expand Down
4 changes: 2 additions & 2 deletions docs/operations/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ These metrics can be used to determine throughput of your pipeline.
| `source_forwarder_ack_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages acknowledged by a given Source Vertex |
| `source_forwarder_drop_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages dropped by a given Source Vertex due to a full Inter-Step Buffer Partition |
| `source_forwarder_drop_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes dropped by a given Source Vertex due to a full Inter-Step Buffer Partition |
| `forwarder_read_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_data_read` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_read_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes read by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_write_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages written to Inter-Step Buffer by a given Vertex |
| `forwarder_write_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes written to Inter-Step Buffer by a given Vertex |
| `forwarder_ack_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages acknowledged by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_drop_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages dropped by a given Vertex due to a full Inter-Step Buffer Partition |
| `forwarder_drop_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes dropped by a given Vertex due to a full Inter-Step Buffer Partition |
| `reduce_isb_reader_read_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Reduce Vertex from an Inter-Step Buffer Partition |
| `reduce_isb_reader_data_read` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Reduce Vertex from an Inter-Step Buffer Partition |
| `reduce_isb_reader_read_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of bytes read by a given Reduce Vertex from an Inter-Step Buffer Partition |
| `reduce_isb_writer_write_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages written to Inter-Step Buffer by a given Reduce Vertex |
| `reduce_isb_writer_write_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of bytes written to Inter-Step Buffer by a given Reduce Vertex |
Expand Down
7 changes: 4 additions & 3 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ func sleep(ctx context.Context, duration time.Duration) {
// since a pod can read from multiple partitions, we will return a map of partition to read count.
func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount {
metricNames := map[string]string{
keyVertexTypeReduce: "reduce_isb_reader_read_total",
keyVertexTypeReduce: "reduce_isb_reader_data_read",
keyVertexTypeSource: "source_forwarder_read_total",
keyVertexTypeSink: "sink_forwarder_read_total",
keyVertexTypeSink: "sink_forwarder_data_read",
}

readTotalMetricName, ok := metricNames[vertexType]
if !ok {
readTotalMetricName = "forwarder_read_total"
readTotalMetricName = "forwarder_data_read"
}

// scrape the read total metric from pod metric port
Expand All @@ -243,6 +243,7 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return nil
}

if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
partitionReadCount := make(map[string]float64)
Expand Down
14 changes: 7 additions & 7 deletions pkg/daemon/server/service/rater/rater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ func (m *raterMockHttpClient) Get(url string) (*http.Response, error) {
resp := &http.Response{
StatusCode: 200,
// the test uses an abstract vertex without specifying vertex type, meaning it's neither source nor reduce,
// hence the default forwarder metric name "forwarder_read_total" is used to retrieve the metric
// hence the default forwarder metric name "forwarder_data_read" is used to retrieve the metric
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d
# HELP forwarder_data_read Total number of Messages Read
# TYPE forwarder_data_read counter
forwarder_data_read{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d
`, m.podOneCount))))}
return resp, nil
} else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
m.podTwoCount = m.podTwoCount + 60
resp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d
# HELP forwarder_data_read Total number of Messages Read
# TYPE forwarder_data_read counter
forwarder_data_read{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d
`, m.podTwoCount))))}
return resp, nil
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
isdf.opts.logger.Warnw("failed to read fromBufferPartition", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// process only if we have any read messages. There is a natural looping here if there is an internal error while
// reading, and we are not able to proceed.
Expand Down Expand Up @@ -242,6 +241,8 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
dataMessages = append(dataMessages, m)
}
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(dataMessages)))
totalMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// fetch watermark if available
// TODO: make it async (concurrent and wait later)
Expand Down
8 changes: 4 additions & 4 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,14 +1585,14 @@ func (f myForwardApplyUDFErrTest) ApplyMapStream(_ context.Context, _ *isb.ReadM

func validateMetrics(t *testing.T, batchSize int64) {
metadata := `
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
# HELP forwarder_data_read Total number of Data Messages Read
# TYPE forwarder_data_read counter
`
expected := `
forwarder_read_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
forwarder_data_read{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
`

err := testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "forwarder_read_total")
err := testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "forwarder_data_read")
if err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/forward/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ import (
"github.com/numaproj/numaflow/pkg/metrics"
)

// readMessagesCount is used to indicate the number of messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
// totalMessagesCount is used to indicate the number of total messages read
var totalMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Name: "read_total",
Name: "total_read",
Help: "Total number of Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of data messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Name: "data_read",
Help: "Total number of Data Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readBytesCount is to indicate the number of bytes read
var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Expand Down
19 changes: 13 additions & 6 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
}
return
}
readMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelPartitionName: df.fromBufferPartition.GetName(),
}).Add(float64(len(readMessages)))

// fetch watermark using the first element's watermark, because we assign the watermark to all other
// elements in the batch based on the watermark we fetch from 0th offset.
// get the watermark for the partition from which we read the messages
Expand Down Expand Up @@ -322,6 +317,18 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
ctrlMessages = append(ctrlMessages, message)
}
}
readMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelPartitionName: df.fromBufferPartition.GetName(),
}).Add(float64(len(dataMessages)))
totalMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelPartitionName: df.fromBufferPartition.GetName(),
}).Add(float64(len(messages)))

// write messages to windows based by PBQs.
successfullyWrittenMessages, err := df.writeMessagesToWindows(ctx, dataMessages)
Expand Down
13 changes: 10 additions & 3 deletions pkg/reduce/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,20 @@ var ackMessageError = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Total number of Acknowledged Errors",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelVertexReplicaIndex, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
// totalMessagesCount is used to indicate the number of total messages read
var totalMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "reduce_isb_reader",
Name: "read_total",
Name: "total_read",
Help: "Total number of Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelVertexReplicaIndex, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of data messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "reduce_isb_reader",
Name: "data_read",
Help: "Total number of Data Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelVertexReplicaIndex, metrics.LabelPartitionName})

// readBytesCount is to indicate the number of bytes read
var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "reduce_isb_reader",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
df.opts.logger.Warnw("failed to read fromBufferPartition", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// process only if we have any read messages. There is a natural looping here if there is an internal error while
// reading, and we are not able to proceed.
Expand Down Expand Up @@ -210,6 +209,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
dataMessages = append(dataMessages, m)
}
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(dataMessages)))
totalMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// fetch watermark if available
// TODO: make it async (concurrent and wait later)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sinks/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,14 @@ func TestWriteToBuffer(t *testing.T) {

func validateMetrics(batchSize int64) (err error) {
metadata := `
# HELP sink_forwarder_read_total Total number of Messages Read
# TYPE sink_forwarder_read_total counter
# HELP sink_forwarder_data_read Total number of Data Messages Read
# TYPE sink_forwarder_data_read counter
`
expected := `
sink_forwarder_read_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
sink_forwarder_data_read{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
`

err = testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "sink_forwarder_read_total")
err = testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "sink_forwarder_data_read")
if err != nil {
return err
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/sinks/forward/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ import (
"github.com/numaproj/numaflow/pkg/metrics"
)

// readMessagesCount is used to indicate the number of messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
// totalMessagesCount is used to indicate the number of total messages read
var totalMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sink_forwarder",
Name: "read_total",
Name: "total_read",
Help: "Total number of Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of data messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sink_forwarder",
Name: "data_read",
Help: "Total number of Data Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readBytesCount is to indicate the number of bytes read
var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sink_forwarder",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
isdf.opts.logger.Warnw("failed to read from source", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Add(float64(len(readMessages)))

// Process only if we have any read messages.
// There is a natural looping here if there is an internal error while reading, and we are not able to proceed.
if len(readMessages) == 0 {
return
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Add(float64(len(readMessages)))

// store the offsets of the messages we read from source
var readOffsets = make([]isb.Offset, len(readMessages))
Expand Down

0 comments on commit e36b3c6

Please sign in to comment.