Skip to content

Commit

Permalink
feat: rater changes to track processing rate per partition (#805)
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
  • Loading branch information
veds-g and yhl25 authored Jun 28, 2023
1 parent 541ceb2 commit dd060cb
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 439 deletions.
125 changes: 67 additions & 58 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"context"
"crypto/tls"
"fmt"
"go.uber.org/zap"
"net/http"
"time"

"github.com/numaproj/numaflow/pkg/metrics"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

Expand All @@ -34,7 +35,6 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)
Expand Down Expand Up @@ -155,87 +155,97 @@ func (ps *pipelineMetadataQuery) GetBuffer(ctx context.Context, req *daemon.GetB
}

// GetVertexMetrics is used to query the metrics service and is used to obtain the processing rate of a given vertex for 1m, 5m and 15m.
// Response contains the metrics for each partition of the vertex.
// In the future maybe latency will also be added here?
// Should this method live here or maybe another file?
func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) {
log := logging.FromContext(ctx)
resp := new(daemon.GetVertexMetricsResponse)

abstractVertex := ps.pipeline.GetVertex(req.GetVertex())
bufferList := abstractVertex.OwnedBufferNames(ps.pipeline.Namespace, ps.pipeline.Name)

// source vertex will have a single partition, which is the vertex name itself
if abstractVertex.IsASource() {
bufferList = append(bufferList, req.GetVertex())
}
partitionPendingInfo := ps.getPending(ctx, req)
metricsArr := make([]*daemon.VertexMetrics, len(bufferList))

for idx, partitionName := range bufferList {
vm := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
}
// get the processing rate for each partition
vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName)
vm.Pendings = partitionPendingInfo[partitionName]
metricsArr[idx] = vm
}

resp.VertexMetrics = metricsArr
return resp, nil
}

// getPending returns the pending count for each partition of the vertex
func (ps *pipelineMetadataQuery) getPending(ctx context.Context, req *daemon.GetVertexMetricsRequest) map[string]map[string]int64 {
vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex())
log := logging.FromContext(ctx)

// Get the headless service name
vertex := &v1alpha1.Vertex{
ObjectMeta: metav1.ObjectMeta{
Name: vertexName,
},
}
headlessServiceName := vertex.GetHeadlessServiceName()

abstractVertex := ps.pipeline.GetVertex(req.GetVertex())
vertexLevelRates := ps.rater.GetRates(req.GetVertex())

metricsCount := 1
// TODO(multi-partition): currently metrics is an aggregation per vertex, so same across each pod for non-reduce vertex
// once multi-partition metrics are in - need to modify to per partition for every vertex
if abstractVertex.IsReduceUDF() {
metricsCount = abstractVertex.GetPartitionCount()
}

metricsArr := make([]*daemon.VertexMetrics, metricsCount)
for i := 0; i < metricsCount; i++ {
headlessServiceName := vertex.GetHeadlessServiceName()
totalPendingMap := make(map[string]map[string]int64)
for idx := 0; idx < metricsCount; idx++ {
// Get the headless service name
// We can query the metrics endpoint of the (i)th pod to obtain this value.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, idx, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := ps.httpClient.Get(url); err != nil {
log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
metricsArr[i] = &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
}
return nil
} else {
// expfmt Parser from prometheus to parse the metrics
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
log.Errorw("Error in parsing to prometheus metric families", zap.Error(err))
return nil, err
return nil
}

// Get the pending messages for this partition
pendings := make(map[string]int64, 0)
if value, ok := result[metrics.VertexPendingMessages]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
lookback := ""
partitionName := ""
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
pendings[lookback] = int64(metric.Gauge.GetValue())
lookback = label.GetValue()

}
if label.GetName() == metrics.LabelPartitionName {
partitionName = label.GetValue()
}
}
if _, ok := totalPendingMap[partitionName]; !ok {
totalPendingMap[partitionName] = make(map[string]int64)
}
totalPendingMap[partitionName][lookback] += int64(metric.Gauge.GetValue())
}
}
vm := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
Pendings: pendings,
}

// Get the processing rate for this partition
if abstractVertex.IsReduceUDF() {
// the processing rate of this ith partition is the rate of the corresponding ith pod.
vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i)
} else {
// if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex.
// TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices.
vm.ProcessingRates = vertexLevelRates
}

metricsArr[i] = vm
}
}

resp.VertexMetrics = metricsArr
return resp, nil
return totalPendingMap
}

func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *daemon.GetPipelineStatusRequest) (*daemon.GetPipelineStatusResponse, error) {
Expand All @@ -259,29 +269,28 @@ func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *dae
return resp, nil
}

totalProcessingRate := float64(0)
totalPending := int64(0)
// may need to revisit later, another concern could be that the processing rate is too slow instead of just 0
for _, vertexMetrics := range vertexResp.VertexMetrics {
var pending int64
var processingRate float64
if p, ok := vertexMetrics.GetPendings()["default"]; ok {
pending = p
} else {
continue
if vertexMetrics.GetProcessingRates() != nil {
if p, ok := vertexMetrics.GetProcessingRates()["default"]; ok {
totalProcessingRate += p
}
}

if p, ok := vertexMetrics.GetProcessingRates()["default"]; ok {
processingRate = p
} else {
continue
if vertexMetrics.GetPendings() != nil {
if p, ok := vertexMetrics.GetPendings()["default"]; ok {
totalPending += p
}
}
}

if pending > 0 && processingRate == 0 {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
if totalPending > 0 && totalProcessingRate == 0 {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
}
}

Expand Down
28 changes: 10 additions & 18 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (mr *mockRater_TestGetVertexMetrics) Start(ctx context.Context) error {
return nil
}

func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string) map[string]float64 {
func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string, partitionName string) map[string]float64 {
res := make(map[string]float64)
res["default"] = 4.894736842105263
res["1m"] = 5.084745762711864
Expand All @@ -88,10 +88,6 @@ func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string) map[string
return res
}

func (mr *mockRater_TestGetVertexMetrics) GetPodRates(vertexName string, podIndex int) map[string]float64 {
return nil
}

func TestGetVertexMetrics(t *testing.T) {
pipelineName := "simple-pipeline"
vertexName := "cat"
Expand All @@ -106,10 +102,10 @@ func TestGetVertexMetrics(t *testing.T) {

metricsResponse := `# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
vertex_pending_messages{period="15m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader := io.NopCloser(bytes.NewReader([]byte(metricsResponse)))

Expand Down Expand Up @@ -229,7 +225,7 @@ func (mr *mockRater_TestGetPipelineStatus) Start(ctx context.Context) error {
return nil
}

func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string) map[string]float64 {
func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string, partitionName string) map[string]float64 {
res := make(map[string]float64)
if mr.isActivelyProcessing {
res["default"] = 4.894736842105263
Expand All @@ -245,10 +241,6 @@ func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string) map[strin
return res
}

func (mr *mockRater_TestGetPipelineStatus) GetPodRates(vertexName string, podIndex int) map[string]float64 {
return nil
}

func TestGetPipelineStatus(t *testing.T) {
pipelineName := "simple-pipeline"
pipeline := &v1alpha1.Pipeline{
Expand All @@ -264,10 +256,10 @@ func TestGetPipelineStatus(t *testing.T) {
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
metricsResponse := `# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
vertex_pending_messages{period="15m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
req := &daemon.GetPipelineStatusRequest{Pipeline: &pipelineName}

Expand Down
61 changes: 14 additions & 47 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ import (
const IndexNotFound = -1

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) {
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, partitionReadCounts *PodReadCount) {
items := q.Items()

// find the element matching the input timestamp and update it
for _, i := range items {
if i.timestamp == time {
i.Update(podName, count)
i.Update(partitionReadCounts)
return
}
}

// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
tc := NewTimestampedCounts(time)
tc.Update(podName, count)
tc.Update(partitionReadCounts)

// close the window for the most recent timestamped count
// close the window for the most recent timestamped partitionReadCounts
switch n := len(items); n {
case 0:
// if the queue is empty, we just append the new timestamped count
Expand All @@ -54,8 +54,8 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p
q.Append(tc)
}

// CalculateRate calculates the rate of the vertex in the last lookback seconds
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 {
// CalculateRate calculates the rate of the vertex partition in the last lookback seconds
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
Expand All @@ -74,56 +74,23 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
// this should not happen in practice because we are using a 10s interval
return 0
}
// TODO: revisit this logic, we can just use the slope (counts[endIndex] - counts[startIndex] / timeDiff) to calculate the rate.
for i := startIndex; i < endIndex; i++ {
if counts[i+1] != nil && counts[i+1].IsWindowClosed() {
delta += counts[i+1].delta
delta += calculatePartitionDelta(counts[i+1], partitionName)
}
}
return delta / float64(timeDiff)
}

// CalculatePodRate calculates the rate of a pod in the last lookback seconds
func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, podName string) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
}
startIndex := findStartIndex(lookbackSeconds, counts)
endIndex := findEndIndex(counts)
if startIndex == IndexNotFound || endIndex == IndexNotFound {
return 0
}

// calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition.
func calculatePartitionDelta(c1 *TimestampedCounts, partitionName string) float64 {
tc1 := c1.PodDeltaCountSnapshot()
delta := float64(0)
// time diff in seconds.
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// if the time difference is 0, we return 0 to avoid division by 0
// this should not happen in practice because we are using a 10s interval
return 0
}
for i := startIndex; i < endIndex; i++ {
if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() {
delta += calculatePodDelta(c1, c2, podName)
}
}
return delta / float64(timeDiff)
}

func calculatePodDelta(c1, c2 *TimestampedCounts, podName string) float64 {
tc1 := c1.Snapshot()
tc2 := c2.Snapshot()
count1, exist1 := tc1[podName]
count2, exist2 := tc2[podName]
if !exist2 {
return 0
} else if !exist1 {
return count2
} else if count2 < count1 {
return count2
} else {
return count2 - count1
for _, partitionCount := range tc1 {
delta += partitionCount[partitionName]
}
return delta
}

// findStartIndex finds the index of the first element in the queue that is within the lookback seconds
Expand Down
Loading

0 comments on commit dd060cb

Please sign in to comment.