Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reduce UI Support #500

Merged
merged 24 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9c373bf
adding GetPipelineWatermark to fetch watermark for all vertices of a …
veds-g Dec 8, 2022
8f956e5
adding GetPipelineWatermark to fetch watermark for all vertices of a …
veds-g Dec 8, 2022
2381690
addressing review comments
veds-g Dec 9, 2022
4f78595
Merge branch 'numaproj:main' into pipeline-watermark
veds-g Dec 9, 2022
ff6f8f4
fixing indentation
veds-g Dec 9, 2022
0d739b7
Addressing review comments
veds-g Dec 13, 2022
079d036
Merge branch 'main' into pipeline-watermark
veds-g Dec 13, 2022
c0ae198
Updating test for GetPipelineWatermarks
veds-g Dec 13, 2022
35d900b
adding previous api for fetching watermarks
veds-g Dec 16, 2022
28653fb
Merge branch 'main' into pipeline-watermark
veds-g Dec 16, 2022
187968f
minor URL fix
veds-g Dec 19, 2022
11693d9
Merge branch 'main' into pipeline-watermark
veds-g Dec 19, 2022
6067ef8
Merge branch 'main' into pipeline-watermark
veds-g Dec 20, 2022
78ee93b
Merge branch 'numaproj:main' into pipeline-watermark
veds-g Jan 11, 2023
45bdcae
Reduce UI Support
veds-g Jan 23, 2023
a7ea0a5
Merge branch 'main' into pipeline-watermark
veds-g Jan 23, 2023
5cecfb7
Addressing review comments
veds-g Jan 25, 2023
53cbad0
Merge branch 'main' into pipeline-watermark
veds-g Jan 25, 2023
c5aee7c
fixing test case
veds-g Jan 25, 2023
b82730a
Merge branch 'main' into pipeline-watermark
veds-g Jan 30, 2023
926e803
modifying Watermark object in proto file
veds-g Feb 1, 2023
977b495
Merge branch 'main' into pipeline-watermark
veds-g Feb 1, 2023
b62fda8
fixing testcases
veds-g Feb 1, 2023
386ab1b
Merge branch 'main' into pipeline-watermark
veds-g Feb 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 264 additions & 61 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ message GetBufferResponse {
message GetVertexMetricsRequest {
required string pipeline = 2;
required string vertex = 3;

required int64 pods = 4;
}

message GetVertexMetricsResponse {
required VertexMetrics vertex = 1;
repeated VertexMetrics podMetrics = 2;
}

/* Watermark */
Expand All @@ -81,6 +82,7 @@ message VertexWatermark {
required string vertex = 2;
required int64 watermark = 3;
required bool isWatermarkEnabled = 4;
repeated int64 podWatermarks = 5;
}

message GetVertexWatermarkResponse {
Expand Down Expand Up @@ -114,7 +116,7 @@ service DaemonService {
};

rpc GetVertexMetrics (GetVertexMetricsRequest) returns (GetVertexMetricsResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/metrics";
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/pods/{pods}/metrics";
whynowy marked this conversation as resolved.
Show resolved Hide resolved
};

// GetVertexWatermark return the watermark of the given vertex.
Expand Down
5 changes: 3 additions & 2 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ func (dc *DaemonClient) GetPipelineBuffer(ctx context.Context, pipeline, buffer
}
}

func (dc *DaemonClient) GetVertexMetrics(ctx context.Context, pipeline, vertex string) (*daemon.VertexMetrics, error) {
func (dc *DaemonClient) GetVertexMetrics(ctx context.Context, pipeline, vertex string, pods int64) (*daemon.GetVertexMetricsResponse, error) {
veds-g marked this conversation as resolved.
Show resolved Hide resolved
if rspn, err := dc.client.GetVertexMetrics(ctx, &daemon.GetVertexMetricsRequest{
Pipeline: &pipeline,
Vertex: &vertex,
Pods: &pods,
}); err != nil {
return nil, err
} else {
return rspn.Vertex, nil
return rspn, nil
}
}

Expand Down
102 changes: 67 additions & 35 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,61 +156,93 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
Name: vertexName,
},
}
podNum := req.GetPods()

processingRates := make(map[string]float64, 0)
pendings := make(map[string]int64, 0)
processingRatesAvg := make(map[string]float64, 0)
pendingsAvg := make(map[string]int64, 0)

// Get the headless service name
headlessServiceName := vertex.GetHeadlessServiceName()
// We can query the metrics endpoint of the 0th pod to obtain this value.
// example: https://simple-pipeline-in-0.simple-pipeline-in-headless.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s-0.%s.%s.svc.cluster.local:%v/metrics", vertexName, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := ps.httpClient.Get(url); err != nil {
log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
} else {
// expfmt Parser from prometheus to parse the metrics
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
log.Errorw("Error in parsing to prometheus metric families", zap.Error(err))
return nil, err
}

// Check if the resultant metrics list contains the processingRate, if it does look for the period label
if value, ok := result[metrics.VertexProcessingRate]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
processingRates[lookback] = metric.Gauge.GetValue()
metricsArr := make([]*daemon.VertexMetrics, podNum)
for i := int64(0); i < podNum; i++ {

processingRates := make(map[string]float64, 0)
pendings := make(map[string]int64, 0)

// We can query the metrics endpoint of the (i)th pod to obtain this value.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := ps.httpClient.Get(url); err != nil {
log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
} else {
// expfmt Parser from prometheus to parse the metrics
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
log.Errorw("Error in parsing to prometheus metric families", zap.Error(err))
return nil, err
}

// Check if the resultant metrics list contains the processingRate, if it does look for the period label
if value, ok := result[metrics.VertexProcessingRate]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
processingRates[lookback] = metric.Gauge.GetValue()
}
}
}
}
}

if value, ok := result[metrics.VertexPendingMessages]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
pendings[lookback] = int64(metric.Gauge.GetValue())
if value, ok := result[metrics.VertexPendingMessages]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
pendings[lookback] = int64(metric.Gauge.GetValue())
}
}
}
}

metricsArr[i] = &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
ProcessingRates: processingRates,
Pendings: pendings,
}

for k, v := range processingRates {
processingRatesAvg[k] += v
}

for k, v := range pendings {
pendingsAvg[k] += v
}
}
}

for k, v := range processingRatesAvg {
processingRatesAvg[k] = v / float64(podNum)
}
for k, v := range pendingsAvg {
pendingsAvg[k] = v / podNum
}

v := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
ProcessingRates: processingRates,
Pendings: pendings,
ProcessingRates: processingRatesAvg,
Pendings: pendingsAvg,
}
resp.Vertex = v
resp.PodMetrics = metricsArr
return resp, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"
}

vertex := "cat"
pods := int64(1)

req := &daemon.GetVertexMetricsRequest{Vertex: &vertex}
req := &daemon.GetVertexMetricsRequest{Vertex: &vertex, Pods: &pods}

resp, err := pipelineMetricsQueryService.GetVertexMetrics(context.Background(), req)
assert.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
timeZero := time.Unix(0, 0).UnixMilli()
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
var podWatermarks []int64
for k := range ps.watermarkFetchers {
vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
PodWatermarks: podWatermarks,
}
i++
}
Expand All @@ -142,10 +144,16 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
i := 0
for k, vertexFetchers := range ps.watermarkFetchers {
var latestWatermark = int64(-1)
var podWatermarks []int64
for _, fetcher := range vertexFetchers {
watermark := fetcher.GetHeadWatermark().UnixMilli()
podWatermark := fetcher.GetPodWatermarks()
veds-g marked this conversation as resolved.
Show resolved Hide resolved
if watermark > latestWatermark {
latestWatermark = watermark
podWatermarks = nil
veds-g marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range podWatermark {
podWatermarks = append(podWatermarks, v.UnixMilli())
}
}
}
vertexName := k
Expand All @@ -154,6 +162,7 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
Vertex: &vertexName,
Watermark: &latestWatermark,
IsWatermarkEnabled: &isWatermarkEnabled,
PodWatermarks: podWatermarks,
}
i++
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -140,11 +141,12 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log := logging.FromContext(ctx).With("worker", fmt.Sprint(worker)).With("vertexKey", key)
log.Debugf("Working on key: %s", key)
strs := strings.Split(key, "/")
if len(strs) != 2 {
if len(strs) != 3 {
veds-g marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("invalid key %q", key)
}
namespace := strs[0]
vertexFullName := strs[1]
pods, _ := strconv.ParseInt(strs[2], 10, 64)
vertex := &dfv1.Vertex{}
if err := s.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: vertexFullName}, vertex); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -210,17 +212,17 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
defer func() {
_ = dClient.Close()
}()
vMetrics, err := dClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
vMetrics, err := dClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name, pods)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
}
// Avg rate and pending for autoscaling are both in the map with key "default", see "pkg/metrics/metrics.go".
rate, existing := vMetrics.ProcessingRates["default"]
rate, existing := vMetrics.Vertex.ProcessingRates["default"]
if !existing || rate < 0 || rate == isb.RateNotAvailable { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
}
pending, existing := vMetrics.Pendings["default"]
pending, existing := vMetrics.Vertex.Pendings["default"]
if !existing || pending < 0 || pending == isb.PendingNotAvailable {
// Pending not available, we don't do anything
log.Debugf("Vertex %s has no pending messages information, skip scaling", vertex.Name)
Expand Down
4 changes: 4 additions & 0 deletions pkg/reduce/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (e *EventTypeWMProgressor) GetHeadWatermark() processor.Watermark {
return processor.Watermark{}
}

func (e *EventTypeWMProgressor) GetPodWatermarks() []processor.Watermark {
return []processor.Watermark{}
}

// PayloadForTest is a dummy payload for testing.
type PayloadForTest struct {
Key string
Expand Down
26 changes: 26 additions & 0 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,32 @@ func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
return processor.Watermark(time.UnixMilli(epoch))
}

// GetPodWatermarks gets the watermarks for all pods of a vertex
veds-g marked this conversation as resolved.
Show resolved Hide resolved
func (e *edgeFetcher) GetPodWatermarks() []processor.Watermark {
var debugString strings.Builder
var allProcessors = e.processorManager.GetAllProcessors()
var podWatermarks []processor.Watermark
for _, p := range allProcessors {
if !p.IsActive() {
continue
}
var o = p.offsetTimeline.GetHeadOffset()
var epoch int64 = math.MaxInt64
e.log.Debugf("Processor: %v (headoffset:%d)", p, o)
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o))
if o != -1 {
epoch = p.offsetTimeline.GetEventTimeFromInt64(o)
}
if epoch == math.MaxInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(-1)))
} else {
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(epoch)))
}
}
return podWatermarks
}

// GetWatermark gets the smallest timestamp for the given offset
func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark {
var offset, err = inputOffset.Sequence()
Expand Down
2 changes: 2 additions & 0 deletions pkg/watermark/fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ type Fetcher interface {
GetWatermark(offset isb.Offset) processor.Watermark
// GetHeadWatermark returns the latest watermark based on the head offset
GetHeadWatermark() processor.Watermark
// GetPodWatermarks returns the watermarks for all the pods in a vertex
GetPodWatermarks() []processor.Watermark
veds-g marked this conversation as resolved.
Show resolved Hide resolved
}
21 changes: 21 additions & 0 deletions pkg/watermark/fetch/source_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ func (e *sourceFetcher) GetHeadWatermark() processor.Watermark {
return processor.Watermark(time.UnixMilli(epoch))
}

// GetPodWatermarks returns the list of watermarks of all pods in a vertex
veds-g marked this conversation as resolved.
Show resolved Hide resolved
func (e *sourceFetcher) GetPodWatermarks() []processor.Watermark {
var podWatermarks []processor.Watermark
for _, p := range e.processorManager.GetAllProcessors() {
if !p.IsActive() {
continue
}
var epoch int64 = math.MinInt64
if p.offsetTimeline.GetHeadWatermark() != -1 {
epoch = p.offsetTimeline.GetHeadWatermark()
}
if epoch == math.MinInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(-1)))
} else {
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(epoch)))
}
}
return podWatermarks
}

// GetWatermark returns the lowest of the latest watermark of all the processors,
// it ignores the input offset.
func (e *sourceFetcher) GetWatermark(_ isb.Offset) processor.Watermark {
Expand Down
Loading