From dd060cb80637ad5904d3000785c0214a80894e7e Mon Sep 17 00:00:00 2001 From: Vedant Gupta <49195734+veds-g@users.noreply.github.com> Date: Wed, 28 Jun 2023 20:37:07 +0530 Subject: [PATCH] feat: rater changes to track processing rate per partition (#805) Signed-off-by: veds-g Signed-off-by: Yashash H L Co-authored-by: Yashash H L --- .../server/service/pipeline_metrics_query.go | 125 +++--- .../service/pipeline_metrics_query_test.go | 28 +- pkg/daemon/server/service/rater/helper.go | 61 +-- .../server/service/rater/helper_test.go | 369 +++++++++--------- pkg/daemon/server/service/rater/rater.go | 76 ++-- pkg/daemon/server/service/rater/rater_test.go | 2 +- .../service/rater/timestamped_counts.go | 100 +++-- .../service/rater/timestamped_counts_test.go | 99 +++-- pkg/metrics/metrics.go | 50 +-- pkg/sources/nats/nats_test.go | 8 +- 10 files changed, 479 insertions(+), 439 deletions(-) diff --git a/pkg/daemon/server/service/pipeline_metrics_query.go b/pkg/daemon/server/service/pipeline_metrics_query.go index 6e611c8dfa..eafaf0af6b 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query.go +++ b/pkg/daemon/server/service/pipeline_metrics_query.go @@ -21,11 +21,12 @@ import ( "context" "crypto/tls" "fmt" - "go.uber.org/zap" "net/http" "time" + "github.com/numaproj/numaflow/pkg/metrics" "github.com/prometheus/common/expfmt" + "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" @@ -34,7 +35,6 @@ import ( "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/apis/proto/daemon" "github.com/numaproj/numaflow/pkg/isbsvc" - "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/fetch" ) @@ -155,87 +155,97 @@ func (ps *pipelineMetadataQuery) GetBuffer(ctx context.Context, req *daemon.GetB } // GetVertexMetrics is used to query the metrics service and is used to obtain the processing rate of a given vertex for 1m, 5m and 15m. +// Response contains the metrics for each partition of the vertex. // In the future maybe latency will also be added here? // Should this method live here or maybe another file? func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) { - log := logging.FromContext(ctx) resp := new(daemon.GetVertexMetricsResponse) + + abstractVertex := ps.pipeline.GetVertex(req.GetVertex()) + bufferList := abstractVertex.OwnedBufferNames(ps.pipeline.Namespace, ps.pipeline.Name) + + // source vertex will have a single partition, which is the vertex name itself + if abstractVertex.IsASource() { + bufferList = append(bufferList, req.GetVertex()) + } + partitionPendingInfo := ps.getPending(ctx, req) + metricsArr := make([]*daemon.VertexMetrics, len(bufferList)) + + for idx, partitionName := range bufferList { + vm := &daemon.VertexMetrics{ + Pipeline: &ps.pipeline.Name, + Vertex: req.Vertex, + } + // get the processing rate for each partition + vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName) + vm.Pendings = partitionPendingInfo[partitionName] + metricsArr[idx] = vm + } + + resp.VertexMetrics = metricsArr + return resp, nil +} + +// getPending returns the pending count for each partition of the vertex +func (ps *pipelineMetadataQuery) getPending(ctx context.Context, req *daemon.GetVertexMetricsRequest) map[string]map[string]int64 { vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex()) + log := logging.FromContext(ctx) - // Get the headless service name vertex := &v1alpha1.Vertex{ ObjectMeta: metav1.ObjectMeta{ Name: vertexName, }, } - headlessServiceName := vertex.GetHeadlessServiceName() - abstractVertex := ps.pipeline.GetVertex(req.GetVertex()) - vertexLevelRates := ps.rater.GetRates(req.GetVertex()) metricsCount := 1 - // TODO(multi-partition): currently metrics is an aggregation per vertex, so same across each pod for non-reduce vertex - // once multi-partition metrics are in - need to modify to per partition for every vertex if abstractVertex.IsReduceUDF() { metricsCount = abstractVertex.GetPartitionCount() } - - metricsArr := make([]*daemon.VertexMetrics, metricsCount) - for i := 0; i < metricsCount; i++ { + headlessServiceName := vertex.GetHeadlessServiceName() + totalPendingMap := make(map[string]map[string]int64) + for idx := 0; idx < metricsCount; idx++ { + // Get the headless service name // We can query the metrics endpoint of the (i)th pod to obtain this value. // example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics - url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort) + url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, idx, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort) if res, err := ps.httpClient.Get(url); err != nil { log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error()) - metricsArr[i] = &daemon.VertexMetrics{ - Pipeline: &ps.pipeline.Name, - Vertex: req.Vertex, - } + return nil } else { // expfmt Parser from prometheus to parse the metrics textParser := expfmt.TextParser{} result, err := textParser.TextToMetricFamilies(res.Body) if err != nil { log.Errorw("Error in parsing to prometheus metric families", zap.Error(err)) - return nil, err + return nil } // Get the pending messages for this partition - pendings := make(map[string]int64, 0) if value, ok := result[metrics.VertexPendingMessages]; ok { metricsList := value.GetMetric() for _, metric := range metricsList { labels := metric.GetLabel() + lookback := "" + partitionName := "" for _, label := range labels { if label.GetName() == metrics.LabelPeriod { - lookback := label.GetValue() - pendings[lookback] = int64(metric.Gauge.GetValue()) + lookback = label.GetValue() + + } + if label.GetName() == metrics.LabelPartitionName { + partitionName = label.GetValue() } } + if _, ok := totalPendingMap[partitionName]; !ok { + totalPendingMap[partitionName] = make(map[string]int64) + } + totalPendingMap[partitionName][lookback] += int64(metric.Gauge.GetValue()) } } - vm := &daemon.VertexMetrics{ - Pipeline: &ps.pipeline.Name, - Vertex: req.Vertex, - Pendings: pendings, - } - - // Get the processing rate for this partition - if abstractVertex.IsReduceUDF() { - // the processing rate of this ith partition is the rate of the corresponding ith pod. - vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i) - } else { - // if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex. - // TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices. - vm.ProcessingRates = vertexLevelRates - } - - metricsArr[i] = vm } } - - resp.VertexMetrics = metricsArr - return resp, nil + return totalPendingMap } func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *daemon.GetPipelineStatusRequest) (*daemon.GetPipelineStatusResponse, error) { @@ -259,29 +269,28 @@ func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *dae return resp, nil } + totalProcessingRate := float64(0) + totalPending := int64(0) // may need to revisit later, another concern could be that the processing rate is too slow instead of just 0 for _, vertexMetrics := range vertexResp.VertexMetrics { - var pending int64 - var processingRate float64 - if p, ok := vertexMetrics.GetPendings()["default"]; ok { - pending = p - } else { - continue + if vertexMetrics.GetProcessingRates() != nil { + if p, ok := vertexMetrics.GetProcessingRates()["default"]; ok { + totalProcessingRate += p + } } - - if p, ok := vertexMetrics.GetProcessingRates()["default"]; ok { - processingRate = p - } else { - continue + if vertexMetrics.GetPendings() != nil { + if p, ok := vertexMetrics.GetPendings()["default"]; ok { + totalPending += p + } } + } - if pending > 0 && processingRate == 0 { - resp.Status = &daemon.PipelineStatus{ - Status: pointer.String(PipelineStatusError), - Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)), - } - return resp, nil + if totalPending > 0 && totalProcessingRate == 0 { + resp.Status = &daemon.PipelineStatus{ + Status: pointer.String(PipelineStatusError), + Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)), } + return resp, nil } } diff --git a/pkg/daemon/server/service/pipeline_metrics_query_test.go b/pkg/daemon/server/service/pipeline_metrics_query_test.go index 4e419827e6..6fcbf3e2b6 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query_test.go +++ b/pkg/daemon/server/service/pipeline_metrics_query_test.go @@ -79,7 +79,7 @@ func (mr *mockRater_TestGetVertexMetrics) Start(ctx context.Context) error { return nil } -func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string) map[string]float64 { +func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string, partitionName string) map[string]float64 { res := make(map[string]float64) res["default"] = 4.894736842105263 res["1m"] = 5.084745762711864 @@ -88,10 +88,6 @@ func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string) map[string return res } -func (mr *mockRater_TestGetVertexMetrics) GetPodRates(vertexName string, podIndex int) map[string]float64 { - return nil -} - func TestGetVertexMetrics(t *testing.T) { pipelineName := "simple-pipeline" vertexName := "cat" @@ -106,10 +102,10 @@ func TestGetVertexMetrics(t *testing.T) { metricsResponse := `# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod. # TYPE vertex_pending_messages gauge -vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011 -vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333 -vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002 -vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002 +vertex_pending_messages{period="15m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 4.011 +vertex_pending_messages{period="1m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 5.333 +vertex_pending_messages{period="5m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 6.002 +vertex_pending_messages{period="default",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 7.00002 ` ioReader := io.NopCloser(bytes.NewReader([]byte(metricsResponse))) @@ -229,7 +225,7 @@ func (mr *mockRater_TestGetPipelineStatus) Start(ctx context.Context) error { return nil } -func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string) map[string]float64 { +func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string, partitionName string) map[string]float64 { res := make(map[string]float64) if mr.isActivelyProcessing { res["default"] = 4.894736842105263 @@ -245,10 +241,6 @@ func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string) map[strin return res } -func (mr *mockRater_TestGetPipelineStatus) GetPodRates(vertexName string, podIndex int) map[string]float64 { - return nil -} - func TestGetPipelineStatus(t *testing.T) { pipelineName := "simple-pipeline" pipeline := &v1alpha1.Pipeline{ @@ -264,10 +256,10 @@ func TestGetPipelineStatus(t *testing.T) { client, _ := isbsvc.NewISBJetStreamSvc(pipelineName) metricsResponse := `# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod. # TYPE vertex_pending_messages gauge -vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011 -vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333 -vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002 -vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002 +vertex_pending_messages{period="15m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 4.011 +vertex_pending_messages{period="1m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 5.333 +vertex_pending_messages{period="5m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 6.002 +vertex_pending_messages{period="default",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 7.00002 ` req := &daemon.GetPipelineStatusRequest{Pipeline: &pipelineName} diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index 52d1ed001c..597e31aa19 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -25,22 +25,22 @@ import ( const IndexNotFound = -1 // UpdateCount updates the count of processed messages for a pod at a given time -func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) { +func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, partitionReadCounts *PodReadCount) { items := q.Items() // find the element matching the input timestamp and update it for _, i := range items { if i.timestamp == time { - i.Update(podName, count) + i.Update(partitionReadCounts) return } } // if we cannot find a matching element, it means we need to add a new timestamped count to the queue tc := NewTimestampedCounts(time) - tc.Update(podName, count) + tc.Update(partitionReadCounts) - // close the window for the most recent timestamped count + // close the window for the most recent timestamped partitionReadCounts switch n := len(items); n { case 0: // if the queue is empty, we just append the new timestamped count @@ -54,8 +54,8 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p q.Append(tc) } -// CalculateRate calculates the rate of the vertex in the last lookback seconds -func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 { +// CalculateRate calculates the rate of the vertex partition in the last lookback seconds +func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 { counts := q.Items() if len(counts) <= 1 { return 0 @@ -74,56 +74,23 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec // this should not happen in practice because we are using a 10s interval return 0 } + // TODO: revisit this logic, we can just use the slope (counts[endIndex] - counts[startIndex] / timeDiff) to calculate the rate. for i := startIndex; i < endIndex; i++ { if counts[i+1] != nil && counts[i+1].IsWindowClosed() { - delta += counts[i+1].delta + delta += calculatePartitionDelta(counts[i+1], partitionName) } } return delta / float64(timeDiff) } -// CalculatePodRate calculates the rate of a pod in the last lookback seconds -func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, podName string) float64 { - counts := q.Items() - if len(counts) <= 1 { - return 0 - } - startIndex := findStartIndex(lookbackSeconds, counts) - endIndex := findEndIndex(counts) - if startIndex == IndexNotFound || endIndex == IndexNotFound { - return 0 - } - +// calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition. +func calculatePartitionDelta(c1 *TimestampedCounts, partitionName string) float64 { + tc1 := c1.PodDeltaCountSnapshot() delta := float64(0) - // time diff in seconds. - timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp - if timeDiff == 0 { - // if the time difference is 0, we return 0 to avoid division by 0 - // this should not happen in practice because we are using a 10s interval - return 0 - } - for i := startIndex; i < endIndex; i++ { - if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() { - delta += calculatePodDelta(c1, c2, podName) - } - } - return delta / float64(timeDiff) -} - -func calculatePodDelta(c1, c2 *TimestampedCounts, podName string) float64 { - tc1 := c1.Snapshot() - tc2 := c2.Snapshot() - count1, exist1 := tc1[podName] - count2, exist2 := tc2[podName] - if !exist2 { - return 0 - } else if !exist1 { - return count2 - } else if count2 < count1 { - return count2 - } else { - return count2 - count1 + for _, partitionCount := range tc1 { + delta += partitionCount[partitionName] } + return delta } // findStartIndex finds the index of the first element in the queue that is within the lookback seconds diff --git a/pkg/daemon/server/service/rater/helper_test.go b/pkg/daemon/server/service/rater/helper_test.go index 19cfeb00eb..61eb8c0549 100644 --- a/pkg/daemon/server/service/rater/helper_test.go +++ b/pkg/daemon/server/service/rater/helper_test.go @@ -17,93 +17,105 @@ limitations under the License. package server import ( + "github.com/stretchr/testify/assert" "testing" "time" - "github.com/stretchr/testify/assert" - sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" ) const TestTime = 1620000000 func TestUpdateCount(t *testing.T) { - t.Run("givenTimeExistsPodExistsCountAvailable_whenUpdate_thenUpdatePodCount", func(t *testing.T) { + t.Run("givenTimeExistsPodExistsPartitionExistsCountAvailable_whenUpdate_thenUpdatePodPartitionCount", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) - tc.Update("pod1", 10.0) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc) - UpdateCount(q, TestTime, "pod1", 20.0) + UpdateCount(q, TestTime, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) assert.Equal(t, 1, q.Length()) - assert.Equal(t, 20.0, q.Items()[0].podCounts["pod1"]) + assert.Equal(t, 20.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) + }) + + t.Run("givenTimeExistsPodExistsPartitionNotExistsCountAvailable_whenUpdate_thenAddPodPartitionCount", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + tc := NewTimestampedCounts(TestTime) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + q.Append(tc) + + UpdateCount(q, TestTime, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0, "partition2": 30.0}}) + + assert.Equal(t, 1, q.Length()) + assert.Equal(t, 20.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) + assert.Equal(t, 30.0, q.Items()[0].podPartitionCount["pod1"]["partition2"]) }) t.Run("givenTimeExistsPodNotExistsCountAvailable_whenUpdate_thenAddPodCount", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) - tc.Update("pod1", 20.0) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) q.Append(tc) - UpdateCount(q, TestTime, "pod2", 10.0) + UpdateCount(q, TestTime, &PodReadCount{"pod2", map[string]float64{"partition1": 10.0}}) assert.Equal(t, 1, q.Length()) - assert.Equal(t, 20.0, q.Items()[0].podCounts["pod1"]) - assert.Equal(t, 10.0, q.Items()[0].podCounts["pod2"]) + assert.Equal(t, 20.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) + assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod2"]["partition1"]) }) t.Run("givenTimeExistsPodExistsCountNotAvailable_whenUpdate_thenNotUpdatePod", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) - tc.Update("pod1", 10.0) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc) - UpdateCount(q, TestTime, "pod1", CountNotAvailable) + UpdateCount(q, TestTime, nil) assert.Equal(t, 1, q.Length()) - assert.Equal(t, 1, len(q.Items()[0].podCounts)) - assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"]) + assert.Equal(t, 1, len(q.Items()[0].podPartitionCount)) + assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) }) t.Run("givenTimeExistsPodNotExistsCountNotAvailable_whenUpdate_thenNoUpdate", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) - tc.Update("pod1", 10.0) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc) - UpdateCount(q, TestTime, "pod2", CountNotAvailable) + UpdateCount(q, TestTime, nil) assert.Equal(t, 1, q.Length()) - assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"]) + assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) }) t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenUpdateNewTimeWithPodAndCloseWindowForPrevTime", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) - tc.Update("pod1", 10.0) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc) - UpdateCount(q, TestTime+1, "pod1", 20.0) + UpdateCount(q, TestTime+1, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) assert.Equal(t, 2, q.Length()) - assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"]) - assert.Equal(t, 20.0, q.Items()[1].podCounts["pod1"]) + assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) + assert.Equal(t, 20.0, q.Items()[1].podPartitionCount["pod1"]["partition1"]) assert.Equal(t, true, tc.IsWindowClosed()) - assert.Equal(t, 10.0, tc.delta) + assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 10.0}}, tc.PodDeltaCountSnapshot()) }) t.Run("givenTimeNotExistsCountNotAvailable_whenUpdate_thenAddEmptyItem", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) - tc.Update("pod1", 10.0) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc) - UpdateCount(q, TestTime+1, "pod2", CountNotAvailable) + UpdateCount(q, TestTime+1, nil) assert.Equal(t, 2, q.Length()) - assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"]) - assert.Equal(t, 0, len(q.Items()[1].podCounts)) + assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) + assert.Equal(t, 0, len(q.Items()[1].podPartitionCount)) }) } @@ -111,16 +123,14 @@ func TestCalculateRate(t *testing.T) { t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnZero", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) // no data - assert.Equal(t, 0.0, CalculateRate(q, 10)) - assert.Equal(t, 0.0, CalculatePodRate(q, 10, "pod1")) + assert.Equal(t, 0.0, CalculateRate(q, 10, "partition1")) // only one data now := time.Now() - tc1 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) - tc1.Update("pod1", 5.0) + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) q.Append(tc1) - assert.Equal(t, 0.0, CalculateRate(q, 10)) - assert.Equal(t, 0.0, CalculatePodRate(q, 10, "pod1")) + assert.Equal(t, 0.0, CalculateRate(q, 10, "partition1")) }) t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -128,27 +138,22 @@ func TestCalculateRate(t *testing.T) { now := time.Now() tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) - tc1.Update("pod1", 5.0) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) q.Append(tc1) tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) - tc2.Update("pod1", 10.0) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc2) tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) - tc3.Update("pod1", 20.0) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) q.Append(tc3) tc3.CloseWindow(tc2) - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 1.0, CalculateRate(q, 15)) - assert.Equal(t, 0.75, CalculateRate(q, 25)) - assert.Equal(t, 0.75, CalculateRate(q, 100)) - - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1")) - assert.Equal(t, 1.0, CalculatePodRate(q, 15, "pod1")) - assert.Equal(t, 0.75, CalculatePodRate(q, 25, "pod1")) - assert.Equal(t, 0.75, CalculatePodRate(q, 100, "pod1")) + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 1.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 0.75, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 0.75, CalculateRate(q, 100, "partition1")) }) t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) { @@ -156,26 +161,21 @@ func TestCalculateRate(t *testing.T) { now := time.Now() tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) - tc1.Update("pod1", 5.0) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) q.Append(tc1) tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) - tc2.Update("pod1", 10.0) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc2) tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) - tc3.Update("pod1", 20.0) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) q.Append(tc3) - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 0.0, CalculateRate(q, 15)) - assert.Equal(t, 0.5, CalculateRate(q, 25)) - assert.Equal(t, 0.5, CalculateRate(q, 100)) - - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1")) - assert.Equal(t, 0.0, CalculatePodRate(q, 15, "pod1")) - assert.Equal(t, 0.5, CalculatePodRate(q, 25, "pod1")) - assert.Equal(t, 0.5, CalculatePodRate(q, 100, "pod1")) + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 0.5, CalculateRate(q, 100, "partition1")) }) t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -183,185 +183,196 @@ func TestCalculateRate(t *testing.T) { now := time.Now() tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) - tc1.Update("pod1", 200.0) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) q.Append(tc1) tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) - tc2.Update("pod1", 100.0) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) q.Append(tc2) tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) - tc3.Update("pod1", 50.0) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) q.Append(tc3) tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) - tc4.Update("pod1", 80.0) + tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 80.0}}) q.Append(tc4) tc4.CloseWindow(tc3) - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 3.0, CalculateRate(q, 15)) - assert.Equal(t, 4.0, CalculateRate(q, 25)) - assert.Equal(t, 6.0, CalculateRate(q, 35)) - assert.Equal(t, 6.0, CalculateRate(q, 100)) - - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1")) - assert.Equal(t, 3.0, CalculatePodRate(q, 15, "pod1")) - assert.Equal(t, 4.0, CalculatePodRate(q, 25, "pod1")) - assert.Equal(t, 6.0, CalculatePodRate(q, 35, "pod1")) - assert.Equal(t, 6.0, CalculatePodRate(q, 100, "pod1")) + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 3.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 4.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 6.0, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 6.0, CalculateRate(q, 100, "partition1")) }) - t.Run("multiplePods_givenCountIncreasesAndDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { + t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) now := time.Now() - tc1 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 30) - tc1.Update("pod1", 200.0) - tc1.Update("pod2", 100.0) + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) q.Append(tc1) tc1.CloseWindow(nil) - tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) - tc2.Update("pod1", 100.0) - tc2.Update("pod2", 200.0) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) q.Append(tc2) tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) - tc3.Update("pod1", 50.0) - tc3.Update("pod2", 300.0) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) q.Append(tc3) tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) - tc4.Update("pod1", 80.0) - tc4.Update("pod2", 400.0) + tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 80.0}}) q.Append(tc4) - tc4.CloseWindow(tc3) - // vertex rate - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 13.0, CalculateRate(q, 15)) - assert.Equal(t, 14.0, CalculateRate(q, 25)) - assert.Equal(t, 16.0, CalculateRate(q, 35)) - assert.Equal(t, 16.0, CalculateRate(q, 100)) - - // pod1 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1")) - assert.Equal(t, 3.0, CalculatePodRate(q, 15, "pod1")) - assert.Equal(t, 4.0, CalculatePodRate(q, 25, "pod1")) - assert.Equal(t, 6.0, CalculatePodRate(q, 35, "pod1")) - assert.Equal(t, 6.0, CalculatePodRate(q, 100, "pod1")) - - // pod2 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod2")) - assert.Equal(t, 10.0, CalculatePodRate(q, 15, "pod2")) - assert.Equal(t, 10.0, CalculatePodRate(q, 25, "pod2")) - assert.Equal(t, 10.0, CalculatePodRate(q, 35, "pod2")) - assert.Equal(t, 10.0, CalculatePodRate(q, 100, "pod2")) + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1")) }) - t.Run("multiplePods_givenPodsComeAndGo_whenCalculateRate_thenReturnRate", func(t *testing.T) { + t.Run("multiplePods_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) now := time.Now() - tc1 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 30) - tc1.Update("pod1", 200.0) - tc1.Update("pod2", 90.0) - tc1.Update("pod3", 50.0) + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) + tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}}) q.Append(tc1) tc1.CloseWindow(nil) - tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) - tc2.Update("pod1", 100.0) - tc2.Update("pod2", 200.0) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) + tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 200.0}}) q.Append(tc2) tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) - tc3.Update("pod1", 50.0) - tc3.Update("pod2", 300.0) - tc3.Update("pod4", 100.0) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) + tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}}) q.Append(tc3) tc3.CloseWindow(tc2) - tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) - tc4.Update("pod2", 400.0) - tc4.Update("pod3", 200.0) - tc4.Update("pod100", 200.0) - q.Append(tc4) - tc4.CloseWindow(tc3) - // vertex rate - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 50.0, CalculateRate(q, 15)) - assert.Equal(t, 37.5, CalculateRate(q, 25)) - assert.Equal(t, 32.0, CalculateRate(q, 35)) - assert.Equal(t, 32.0, CalculateRate(q, 100)) - - // pod1 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1")) - assert.Equal(t, 0.0, CalculatePodRate(q, 15, "pod1")) - assert.Equal(t, 2.5, CalculatePodRate(q, 25, "pod1")) - assert.Equal(t, 5.0, CalculatePodRate(q, 35, "pod1")) - assert.Equal(t, 5.0, CalculatePodRate(q, 100, "pod1")) - - // pod2 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod2")) - assert.Equal(t, 10.0, CalculatePodRate(q, 15, "pod2")) - assert.Equal(t, 10.0, CalculatePodRate(q, 25, "pod2")) - assert.InDelta(t, 10.333, CalculatePodRate(q, 35, "pod2"), 0.001) - assert.InDelta(t, 10.333, CalculatePodRate(q, 100, "pod2"), 0.001) - - // pod3 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod3")) - assert.Equal(t, 20.0, CalculatePodRate(q, 15, "pod3")) - assert.Equal(t, 10.0, CalculatePodRate(q, 25, "pod3")) - assert.InDelta(t, 6.666, CalculatePodRate(q, 100, "pod3"), 0.001) - assert.InDelta(t, 6.666, CalculatePodRate(q, 100, "pod3"), 0.001) - - // pod4 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod4")) - assert.Equal(t, 0.0, CalculatePodRate(q, 15, "pod4")) - assert.Equal(t, 5.0, CalculatePodRate(q, 25, "pod4")) - assert.InDelta(t, 3.333, CalculatePodRate(q, 35, "pod4"), 0.001) - assert.InDelta(t, 3.333, CalculatePodRate(q, 100, "pod4"), 0.001) - - // pod100 rate - assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod100")) - assert.Equal(t, 20.0, CalculatePodRate(q, 15, "pod100")) - assert.Equal(t, 10.0, CalculatePodRate(q, 25, "pod100")) - assert.InDelta(t, 6.666, CalculatePodRate(q, 35, "pod100"), 0.001) - assert.InDelta(t, 6.666, CalculatePodRate(q, 100, "pod100"), 0.001) + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 20.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 17.5, CalculateRate(q, 35, "partition1")) + }) + + t.Run("multiplePods_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + now := time.Now() + + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) + tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}}) + q.Append(tc1) + tc1.CloseWindow(nil) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) + tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 200.0}}) + q.Append(tc2) + tc2.CloseWindow(tc1) + tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) + tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}}) + q.Append(tc3) + tc3.CloseWindow(tc2) + + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 15.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 22.5, CalculateRate(q, 35, "partition1")) + }) + + t.Run("multiplePods_givenOnePodRestarts_whenCalculateRate_thenReturnRate", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + now := time.Now() + + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) + tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}}) + q.Append(tc1) + tc1.CloseWindow(nil) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) + tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 200.0}}) + q.Append(tc2) + tc2.CloseWindow(tc1) + tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) + tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}}) + q.Append(tc3) + tc3.CloseWindow(tc2) + + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 20.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 22.5, CalculateRate(q, 35, "partition1")) }) - t.Run("queueOverflowed_givenPodsComeAndGo_whenCalculateRate_thenReturnRate", func(t *testing.T) { - q := sharedqueue.New[*TimestampedCounts](3) + t.Run("multiplePods_givenPodsComeAndGo_whenCalculateRate_thenReturnRate", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) now := time.Now() tc1 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 30) - tc1.Update("pod1", 200.0) - tc1.Update("pod2", 90.0) - tc1.Update("pod3", 50.0) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) + tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 90.0}}) + tc1.Update(&PodReadCount{"pod3", map[string]float64{"partition3": 50.0}}) q.Append(tc1) tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) - tc2.Update("pod1", 100.0) - tc2.Update("pod2", 200.0) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) + tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 200.0}}) q.Append(tc2) tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) - tc3.Update("pod1", 50.0) - tc3.Update("pod2", 300.0) - tc3.Update("pod4", 100.0) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) + tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 300.0}}) + tc3.Update(&PodReadCount{"pod4", map[string]float64{"partition4": 100.0}}) q.Append(tc3) tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) - tc4.Update("pod2", 400.0) - tc4.Update("pod3", 200.0) - tc4.Update("pod100", 200.0) + tc4.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 400.0}}) + tc4.Update(&PodReadCount{"pod3", map[string]float64{"partition3": 200.0}}) + tc4.Update(&PodReadCount{"pod100", map[string]float64{"partition100": 200.0}}) q.Append(tc4) tc4.CloseWindow(tc3) - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 50.0, CalculateRate(q, 15)) - assert.Equal(t, 37.5, CalculateRate(q, 25)) - assert.Equal(t, 37.5, CalculateRate(q, 35)) - assert.Equal(t, 37.5, CalculateRate(q, 100)) + // partition1 rate + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 2.5, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 5.0, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 5.0, CalculateRate(q, 100, "partition1")) + + // partition2 rate + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2")) + assert.Equal(t, 10.0, CalculateRate(q, 15, "partition2")) + assert.Equal(t, 10.0, CalculateRate(q, 25, "partition2")) + assert.InDelta(t, 10.333, CalculateRate(q, 35, "partition2"), 0.001) + assert.InDelta(t, 10.333, CalculateRate(q, 100, "partition2"), 0.001) + + // partition3 rate + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3")) + assert.Equal(t, 20.0, CalculateRate(q, 15, "partition3")) + assert.Equal(t, 10.0, CalculateRate(q, 25, "partition3")) + assert.InDelta(t, 6.666, CalculateRate(q, 100, "partition3"), 0.001) + assert.InDelta(t, 6.666, CalculateRate(q, 100, "partition3"), 0.001) + + // partition4 rate + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition4")) + assert.Equal(t, 5.0, CalculateRate(q, 25, "partition4")) + assert.InDelta(t, 3.333, CalculateRate(q, 35, "partition4"), 0.001) + assert.InDelta(t, 3.333, CalculateRate(q, 100, "partition4"), 0.001) + + // partition100 rate + assert.Equal(t, 0.0, CalculateRate(q, 5, "partition100")) + assert.Equal(t, 20.0, CalculateRate(q, 15, "partition100")) + assert.Equal(t, 10.0, CalculateRate(q, 25, "partition100")) + assert.InDelta(t, 6.666, CalculateRate(q, 35, "partition100"), 0.001) + assert.InDelta(t, 6.666, CalculateRate(q, 100, "partition100"), 0.001) }) + } diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index 151d779669..02b253f0cd 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -21,7 +21,6 @@ import ( "crypto/tls" "fmt" "net/http" - "strconv" "strings" "time" @@ -35,8 +34,7 @@ import ( type Ratable interface { Start(ctx context.Context) error - GetRates(vertexName string) map[string]float64 - GetPodRates(vertexName string, podIndex int) map[string]float64 + GetRates(vertexName, partitionName string) map[string]float64 } // CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds @@ -65,6 +63,22 @@ type Rater struct { options *options } +// PodReadCount is a struct to maintain count of messages read from each partition by a pod +type PodReadCount struct { + name string + partitionReadCounts map[string]float64 +} + +func (p *PodReadCount) Name() string { + return p.name +} + +func (p *PodReadCount) PartitionReadCounts() map[string]float64 { + return p.partitionReadCounts +} + +// vertex -> [timestamp(podCounts{podName: count}, partitionCounts{partitionIdx: count}, isWindowClosed, delta(across all the pods))] + func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater { rater := Rater{ pipeline: p, @@ -120,19 +134,19 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error vertexName := podInfo[1] vertexType := podInfo[3] podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-") - var count float64 + var podReadCount *PodReadCount activePods := r.podTracker.GetActivePods() if activePods.Contains(key) { - count = r.getTotalCount(vertexName, vertexType, podName) - if count == CountNotAvailable { - log.Debugf("Failed retrieving total count for pod %s", podName) + podReadCount = r.getPodReadCounts(vertexName, vertexType, podName) + if podReadCount == nil { + log.Debugf("Failed retrieving total podReadCount for pod %s", podName) } } else { - log.Debugf("Pod %s does not exist, updating it with CountNotAvailable...", podName) - count = CountNotAvailable + log.Debugf("Pod %s does not exist, updating it with nil...", podName) + podReadCount = nil } now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix() - UpdateCount(r.timestampedPodCounts[vertexName], now, podName, count) + UpdateCount(r.timestampedPodCounts[vertexName], now, podReadCount) return nil } @@ -199,19 +213,20 @@ func sleep(ctx context.Context, duration time.Duration) { } } -// getTotalCount returns the total number of messages read by the pod -func (r *Rater) getTotalCount(vertexName, vertexType, podName string) float64 { +// getPodReadCounts returns the total number of messages read by the pod +// since a pod can read from multiple partitions, we will return a map of partition to read count. +func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount { // scrape the read total metric from pod metric port url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort) if res, err := r.httpClient.Get(url); err != nil { r.log.Errorf("failed reading the metrics endpoint, %v", err.Error()) - return CountNotAvailable + return nil } else { textParser := expfmt.TextParser{} result, err := textParser.TextToMetricFamilies(res.Body) if err != nil { r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error()) - return CountNotAvailable + return nil } var readTotalMetricName string if vertexType == "reduce" { @@ -221,32 +236,31 @@ func (r *Rater) getTotalCount(vertexName, vertexType, podName string) float64 { } if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { metricsList := value.GetMetric() - return metricsList[0].Counter.GetValue() + partitionReadCount := make(map[string]float64) + for _, ele := range metricsList { + partitionName := "" + for _, label := range ele.Label { + if label.GetName() == "partition_name" { + partitionName = label.GetValue() + } + } + partitionReadCount[partitionName] = ele.Counter.GetValue() + } + podReadCount := &PodReadCount{podName, partitionReadCount} + return podReadCount } else { r.log.Errorf("failed getting the read total metric, the metric is not available.") - return CountNotAvailable + return nil } } } -// GetRates returns the processing rates of the vertex in the format of lookback second to rate mappings -func (r *Rater) GetRates(vertexName string) map[string]float64 { - var result = make(map[string]float64) - // calculate rates for each lookback seconds - for n, i := range r.buildLookbackSecondsMap(vertexName) { - r := CalculateRate(r.timestampedPodCounts[vertexName], i) - result[n] = r - } - return result -} - -// GetPodRates returns the processing rates of the pod in the format of lookback second to rate mappings -func (r *Rater) GetPodRates(vertexName string, podIndex int) map[string]float64 { - podName := r.pipeline.Name + "-" + vertexName + "-" + strconv.Itoa(podIndex) +// GetRates returns the processing rates of the vertex partition in the format of lookback second to rate mappings +func (r *Rater) GetRates(vertexName, partitionName string) map[string]float64 { var result = make(map[string]float64) // calculate rates for each lookback seconds for n, i := range r.buildLookbackSecondsMap(vertexName) { - r := CalculatePodRate(r.timestampedPodCounts[vertexName], i, podName) + r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionName) result[n] = r } return result diff --git a/pkg/daemon/server/service/rater/rater_test.go b/pkg/daemon/server/service/rater/rater_test.go index f76018633b..40e45a0e9c 100644 --- a/pkg/daemon/server/service/rater/rater_test.go +++ b/pkg/daemon/server/service/rater/rater_test.go @@ -119,7 +119,7 @@ func TestRater_Start(t *testing.T) { }() go func() { for { - if r.GetRates("v")["1m"] <= 0 || r.GetPodRates("v", 0)["1m"] <= 0 || r.GetPodRates("v", 1)["1m"] <= 0 { + if r.GetRates("v", "v0")["1m"] <= 0 || r.GetRates("v", "v1")["1m"] <= 0 { time.Sleep(time.Second) } else { succeedChan <- struct{}{} diff --git a/pkg/daemon/server/service/rater/timestamped_counts.go b/pkg/daemon/server/service/rater/timestamped_counts.go index fb772ec98b..476df10cef 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts.go +++ b/pkg/daemon/server/service/rater/timestamped_counts.go @@ -29,54 +29,69 @@ const CountNotAvailable = -1 type TimestampedCounts struct { // timestamp in seconds, is the time when the count is recorded timestamp int64 - // podName to count mapping - podCounts map[string]float64 + // pod to partitionCount mapping + podPartitionCount map[string]map[string]float64 + // pod to partition delta mapping + podPartitionDelta map[string]map[string]float64 // isWindowClosed indicates whether we have finished collecting pod counts for this timestamp isWindowClosed bool - // delta is the total count change from the previous window, it's valid only when isWindowClosed is true - delta float64 - lock *sync.RWMutex + lock *sync.RWMutex } func NewTimestampedCounts(t int64) *TimestampedCounts { return &TimestampedCounts{ - timestamp: t, - podCounts: make(map[string]float64), - isWindowClosed: false, - delta: 0, - lock: new(sync.RWMutex), + timestamp: t, + podPartitionCount: make(map[string]map[string]float64), + podPartitionDelta: make(map[string]map[string]float64), + isWindowClosed: false, + lock: new(sync.RWMutex), } } // Update updates the count for a pod if the current window is not closed -func (tc *TimestampedCounts) Update(podName string, count float64) { +func (tc *TimestampedCounts) Update(podReadCount *PodReadCount) { tc.lock.Lock() defer tc.lock.Unlock() - if count == CountNotAvailable { - // we choose to skip updating when count is not available for the pod, instead of removing the pod from the map. - // imagine if the getTotalCount call fails to scrape the count metric, and it's NOT because the pod is down. - // in this case getTotalCount returns CountNotAvailable. - // if we remove the pod from the map and then the next scrape successfully gets the count, we can reach a state that in the timestamped counts, - // for this single pod, at t1, count is 123456, at t2, the map doesn't contain this pod and t3, count is 123457. + if podReadCount == nil { + // we choose to skip updating when podReadCount is nil, instead of removing the pod from the map. + // imagine if the getPodReadCounts call fails to scrape the partitionReadCounts metric, and it's NOT because the pod is down. + // in this case getPodReadCounts returns nil. + // if we remove the pod from the map and then the next scrape successfully gets the partitionReadCounts, we can reach a state that in the timestamped counts, + // for this single pod, at t1, partitionReadCounts is 123456, at t2, the map doesn't contain this pod and t3, partitionReadCounts is 123457. // when calculating the rate, as we sum up deltas among timestamps, we will get 123457 total delta instead of the real delta 1. // one occurrence of such case can lead to extremely high rate and mess up the autoscaling. - // hence we'd rather keep the count as it is to avoid wrong rate calculation. + // hence we'd rather keep the partitionReadCounts as it is to avoid wrong rate calculation. return } if tc.isWindowClosed { // we skip updating if the window is already closed. return } - tc.podCounts[podName] = count + + // since the pod can read from multiple partitions, we overwrite the previous partitionReadCounts for this pod + // with the new partitionReadCounts map, since it is a counter metric, the new value is always greater than the previous one. + tc.podPartitionCount[podReadCount.Name()] = podReadCount.PartitionReadCounts() +} + +// PodReadCountSnapshot returns a copy of the podName to partitionCount mapping +// it's used to ensure the returned map is not modified by other goroutines +func (tc *TimestampedCounts) PodReadCountSnapshot() map[string]map[string]float64 { + tc.lock.RLock() + defer tc.lock.RUnlock() + counts := make(map[string]map[string]float64) + for k, v := range tc.podPartitionCount { + counts[k] = v + } + return counts } -// Snapshot returns a copy of the podName to count mapping +// PodDeltaCountSnapshot returns a copy of the podName to partition delta mapping // it's used to ensure the returned map is not modified by other goroutines -func (tc *TimestampedCounts) Snapshot() map[string]float64 { +func (tc *TimestampedCounts) PodDeltaCountSnapshot() map[string]map[string]float64 { tc.lock.RLock() defer tc.lock.RUnlock() - counts := make(map[string]float64) - for k, v := range tc.podCounts { + counts := make(map[string]map[string]float64) + for k, v := range tc.podPartitionDelta { counts[k] = v } return counts @@ -91,25 +106,28 @@ func (tc *TimestampedCounts) IsWindowClosed() bool { // CloseWindow closes the window and calculates the delta by comparing the current pod counts with the previous window func (tc *TimestampedCounts) CloseWindow(prev *TimestampedCounts) { - // prepare pod counts for both current and previous window for delta calculation - var prevPodCounts map[string]float64 + podReadCount := tc.PodReadCountSnapshot() + var prevPodReadCount map[string]map[string]float64 if prev == nil { - prevPodCounts = make(map[string]float64) + prevPodReadCount = make(map[string]map[string]float64) } else { - prevPodCounts = prev.Snapshot() + prevPodReadCount = prev.PodReadCountSnapshot() } - currPodCounts := tc.Snapshot() - - // calculate the delta by comparing the current pod counts with the previous window - delta := 0.0 - for key, currCount := range currPodCounts { - prevCount := prevPodCounts[key] // if key doesn't exist in prevPodCounts, prevCount is 0 - if currCount < prevCount { - // this can happen when a pod is restarted during the window - // we count the new count as the delta - delta += currCount - } else { - delta += currCount - prevCount + podPartitionDelta := make(map[string]map[string]float64) + + for podName, partitionReadCounts := range podReadCount { + prevPartitionReadCounts := prevPodReadCount[podName] + for partitionName, count := range partitionReadCounts { + prevCount := prevPartitionReadCounts[partitionName] + // delta will be equal to count in case of restart + delta := count + if count >= prevCount { + delta = count - prevCount + } + if _, ok := podPartitionDelta[podName]; !ok { + podPartitionDelta[podName] = make(map[string]float64) + } + podPartitionDelta[podName][partitionName] = delta } } @@ -117,7 +135,7 @@ func (tc *TimestampedCounts) CloseWindow(prev *TimestampedCounts) { tc.lock.Lock() defer tc.lock.Unlock() tc.isWindowClosed = true - tc.delta = delta + tc.podPartitionDelta = podPartitionDelta } // ToString returns a string representation of the TimestampedCounts @@ -125,6 +143,6 @@ func (tc *TimestampedCounts) CloseWindow(prev *TimestampedCounts) { func (tc *TimestampedCounts) ToString() string { tc.lock.RLock() defer tc.lock.RUnlock() - res := fmt.Sprintf("{timestamp: %d, podCount: %v}", tc.timestamp, tc.podCounts) + res := fmt.Sprintf("{timestamp: %d, partitionCount: %v}", tc.timestamp, tc.podPartitionCount) return res } diff --git a/pkg/daemon/server/service/rater/timestamped_counts_test.go b/pkg/daemon/server/service/rater/timestamped_counts_test.go index 514f4b285c..f89a45ccd0 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts_test.go +++ b/pkg/daemon/server/service/rater/timestamped_counts_test.go @@ -23,54 +23,77 @@ import ( ) func TestNewTimestampedCounts(t *testing.T) { - tc := NewTimestampedCounts(1620000000) - assert.Equal(t, int64(1620000000), tc.timestamp) - assert.Equal(t, 0, len(tc.podCounts)) + tc := NewTimestampedCounts(TestTime) + assert.Equal(t, int64(TestTime), tc.timestamp) + assert.Equal(t, 0, len(tc.podPartitionDelta)) + assert.Equal(t, 0, len(tc.podPartitionCount)) assert.Equal(t, false, tc.isWindowClosed) - assert.Equal(t, 0.0, tc.delta) } func TestTimestampedCounts_Update(t *testing.T) { - tc := NewTimestampedCounts(1620000000) - tc.Update("pod1", 10.0) - assert.Equal(t, 10.0, tc.podCounts["pod1"]) - tc.Update("pod1", 20.0) - assert.Equal(t, 20.0, tc.podCounts["pod1"]) - tc.Update("pod2", 30.0) - assert.Equal(t, 30.0, tc.podCounts["pod2"]) - assert.Equal(t, 2, len(tc.podCounts)) - tc.Update("pod1", CountNotAvailable) - assert.Equal(t, 2, len(tc.podCounts)) - assert.Equal(t, 20, int(tc.podCounts["pod1"])) - assert.Equal(t, 30, int(tc.podCounts["pod2"])) + tc := NewTimestampedCounts(TestTime) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + assert.Equal(t, 10.0, tc.podPartitionCount["pod1"]["partition1"]) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) + assert.Equal(t, 20.0, tc.podPartitionCount["pod1"]["partition1"]) + tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 30.0}}) + assert.Equal(t, 30.0, tc.podPartitionCount["pod2"]["partition1"]) + assert.Equal(t, 2, len(tc.podPartitionCount)) + tc.Update(nil) + assert.Equal(t, 2, len(tc.podPartitionCount)) + assert.Equal(t, 20, int(tc.podPartitionCount["pod1"]["partition1"])) + assert.Equal(t, 30, int(tc.podPartitionCount["pod2"]["partition1"])) assert.Equal(t, false, tc.isWindowClosed) - assert.Equal(t, 0.0, tc.delta) tc.CloseWindow(nil) assert.Equal(t, true, tc.isWindowClosed) - // (20-0) + (30-0) = 50 - assert.Equal(t, 50.0, tc.delta) - // verify that updating pod counts doesn't take effect if the window is already closed - tc.Update("pod1", 10.0) - assert.Equal(t, 20, int(tc.podCounts["pod1"])) - tc.Update("pod2", 20.0) - assert.Equal(t, 30, int(tc.podCounts["pod2"])) - - tc2 := NewTimestampedCounts(1620000001) - tc2.Update("pod1", 40.0) - assert.Equal(t, 40.0, tc2.podCounts["pod1"]) - tc2.Update("pod2", 10.0) - assert.Equal(t, 10.0, tc2.podCounts["pod2"]) + // verify that updating partition counts doesn't take effect if the window is already closed + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + assert.Equal(t, 20, int(tc.podPartitionCount["pod1"]["partition1"])) + tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) + assert.Equal(t, 30, int(tc.podPartitionCount["pod2"]["partition1"])) + + tc2 := NewTimestampedCounts(TestTime + 1) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 40.0}}) + assert.Equal(t, 40.0, tc2.podPartitionCount["pod1"]["partition1"]) + tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 10.0}}) + assert.Equal(t, 10.0, tc2.podPartitionCount["pod2"]["partition1"]) tc2.CloseWindow(tc) assert.Equal(t, true, tc2.isWindowClosed) - // (40-20) + 10 = 30 - assert.Equal(t, 30.0, tc2.delta) } -func TestTimestampedCounts_Snapshot(t *testing.T) { - tc := NewTimestampedCounts(1620000000) - tc.Update("pod1", 10.0) - tc.Update("pod2", 20.0) - tc.Update("pod3", 30.0) - assert.Equal(t, map[string]float64{"pod1": 10.0, "pod2": 20.0, "pod3": 30.0}, tc.Snapshot()) +func TestTimestampedPodCounts_Snapshot(t *testing.T) { + tc := NewTimestampedCounts(TestTime) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) + assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 10.0}, "pod2": {"partition1": 20.0}}, tc.PodReadCountSnapshot()) +} + +func TestTimestampedPodDeltas_Snapshot(t *testing.T) { + tc := NewTimestampedCounts(TestTime) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) + tc.CloseWindow(nil) + + tc1 := NewTimestampedCounts(TestTime + 1) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) + tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 30.0}}) + tc1.CloseWindow(tc) + + assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 10.0}, "pod2": {"partition1": 10.0}}, tc1.PodDeltaCountSnapshot()) +} + +func TestTimestamped_CloseWindow(t *testing.T) { + tc := NewTimestampedCounts(TestTime) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) + tc.CloseWindow(nil) + + // verify that pod1 restart should give the new count instead of the difference + tc1 := NewTimestampedCounts(TestTime + 1) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) + tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 30.0}}) + tc1.CloseWindow(tc) + + assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 5.0}, "pod2": {"partition1": 10.0}}, tc1.PodDeltaCountSnapshot()) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 3afcb8a35d..03d1c82c50 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -52,7 +52,7 @@ var ( pending = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: VertexPendingMessages, Help: "Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.", - }, []string{LabelPipeline, LabelVertex, LabelPeriod}) + }, []string{LabelPipeline, LabelVertex, LabelPeriod, LabelPartitionName}) // fixedLookbackSeconds Always expose metrics of following lookback seconds (1m, 5m, 15m) fixedLookbackSeconds = map[string]int64{"1m": 60, "5m": 300, "15m": 900} @@ -70,13 +70,13 @@ type timestampedPending struct { // 2. Serve an endpoint to execute health checks type metricsServer struct { vertex *dfv1.Vertex - lagReaders []isb.LagReader + lagReaders map[string]isb.LagReader // lookbackSeconds is the look back seconds for pending calculation used for autoscaling lookbackSeconds int64 lagCheckingInterval time.Duration refreshInterval time.Duration - // pendingInfo stores a list of pending/timestamp(seconds) information - pendingInfo *sharedqueue.OverflowQueue[timestampedPending] + // partitionPendingInfo stores a list of pending/timestamp(seconds) information for each partition + partitionPendingInfo map[string]*sharedqueue.OverflowQueue[timestampedPending] // Functions that health check executes healthCheckExecutors []func() error } @@ -84,7 +84,7 @@ type metricsServer struct { type Option func(*metricsServer) // WithLagReaders sets the lag readers -func WithLagReaders(r []isb.LagReader) Option { +func WithLagReaders(r map[string]isb.LagReader) Option { return func(m *metricsServer) { m.lagReaders = r } @@ -125,10 +125,10 @@ func NewMetricsOptions(ctx context.Context, vertex *dfv1.Vertex, serverHandler H })) } } - var lagReaders []isb.LagReader + lagReaders := make(map[string]isb.LagReader) for _, reader := range readers { if x, ok := reader.(isb.LagReader); ok { - lagReaders = append(lagReaders, x) + lagReaders[reader.GetName()] = x } } if len(lagReaders) > 0 { @@ -141,6 +141,7 @@ func NewMetricsOptions(ctx context.Context, vertex *dfv1.Vertex, serverHandler H func NewMetricsServer(vertex *dfv1.Vertex, opts ...Option) *metricsServer { m := new(metricsServer) m.vertex = vertex + m.partitionPendingInfo = make(map[string]*sharedqueue.OverflowQueue[timestampedPending]) m.refreshInterval = 5 * time.Second // Default refresh interval m.lagCheckingInterval = 3 * time.Second // Default lag checking interval m.lookbackSeconds = dfv1.DefaultLookbackSeconds // Default @@ -150,7 +151,10 @@ func NewMetricsServer(vertex *dfv1.Vertex, opts ...Option) *metricsServer { } } if m.lagReaders != nil { - m.pendingInfo = sharedqueue.New[timestampedPending](1800) + for partitionName := range m.lagReaders { + m.partitionPendingInfo[partitionName] = sharedqueue.New[timestampedPending](1800) + + } } return m } @@ -168,22 +172,16 @@ func (ms *metricsServer) buildupPendingInfo(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - totalPending := int64(0) - skipPending := false - for _, pendingLag := range ms.lagReaders { - if p, err := pendingLag.Pending(ctx); err != nil || p == isb.PendingNotAvailable { + for partitionName, lagReader := range ms.lagReaders { + if pending, err := lagReader.Pending(ctx); err != nil { log.Errorw("Failed to get pending messages", zap.Error(err)) - skipPending = true - break } else { - totalPending += p + if pending != isb.PendingNotAvailable { + ts := timestampedPending{pending: pending, timestamp: time.Now().Unix()} + ms.partitionPendingInfo[partitionName].Append(ts) + } } } - // Skip pending information if any pending is not available - if !skipPending { - ts := timestampedPending{pending: totalPending, timestamp: time.Now().Unix()} - ms.pendingInfo.Append(ts) - } } } } @@ -203,9 +201,11 @@ func (ms *metricsServer) exposePendingMetrics(ctx context.Context) { select { case <-ticker.C: if ms.lagReaders != nil { - for n, i := range lookbackSecondsMap { - if p := ms.calculatePending(i); p != isb.PendingNotAvailable { - pending.WithLabelValues(ms.vertex.Spec.PipelineName, ms.vertex.Spec.Name, n).Set(float64(p)) + for partitionName := range ms.lagReaders { + for n, i := range lookbackSecondsMap { + if p := ms.calculatePending(i, partitionName); p != isb.PendingNotAvailable { + pending.WithLabelValues(ms.vertex.Spec.PipelineName, ms.vertex.Spec.Name, n, partitionName).Set(float64(p)) + } } } } @@ -216,9 +216,9 @@ func (ms *metricsServer) exposePendingMetrics(ctx context.Context) { } // Calculate the avg pending of last seconds -func (ms *metricsServer) calculatePending(seconds int64) int64 { +func (ms *metricsServer) calculatePending(seconds int64, partitionName string) int64 { result := isb.PendingNotAvailable - items := ms.pendingInfo.Items() + items := ms.partitionPendingInfo[partitionName].Items() total := int64(0) num := int64(0) now := time.Now().Unix() diff --git a/pkg/sources/nats/nats_test.go b/pkg/sources/nats/nats_test.go index 6468d45799..0440563d5d 100644 --- a/pkg/sources/nats/nats_test.go +++ b/pkg/sources/nats/nats_test.go @@ -106,8 +106,10 @@ func Test_Single(t *testing.T) { _ = nc.Publish(testSubject, []byte("3")) msgs, err := ns.Read(context.Background(), 5) + readMessagesCount := len(msgs) assert.NoError(t, err) - for len(msgs) != 3 { +loop: + for { select { case <-ctx.Done(): t.Fatal("timeout waiting for messages") @@ -115,6 +117,10 @@ func Test_Single(t *testing.T) { default: msgs, err = ns.Read(context.Background(), 5) assert.NoError(t, err) + readMessagesCount += len(msgs) + if readMessagesCount == 3 { + break loop + } time.Sleep(10 * time.Millisecond) } }