Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g committed Jan 25, 2023
1 parent a7ea0a5 commit 848e256
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 293 deletions.
251 changes: 70 additions & 181 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

24 changes: 1 addition & 23 deletions pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ message GetBufferResponse {
message GetVertexMetricsRequest {
required string pipeline = 2;
required string vertex = 3;
required int64 pods = 4;
}

message GetVertexMetricsResponse {
required VertexMetrics vertex = 1;
repeated VertexMetrics podMetrics = 2;
repeated VertexMetrics vertexMetrics = 1;
}

/* Watermark */
Expand Down Expand Up @@ -116,7 +114,7 @@ service DaemonService {
};

rpc GetVertexMetrics (GetVertexMetricsRequest) returns (GetVertexMetricsResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/pods/{pods}/metrics";
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/metrics";
};

// GetVertexWatermark return the watermark of the given vertex.
Expand Down
5 changes: 2 additions & 3 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ func (dc *DaemonClient) GetPipelineBuffer(ctx context.Context, pipeline, buffer
}
}

func (dc *DaemonClient) GetVertexMetrics(ctx context.Context, pipeline, vertex string, pods int64) (*daemon.GetVertexMetricsResponse, error) {
func (dc *DaemonClient) GetVertexMetrics(ctx context.Context, pipeline, vertex string) ([]*daemon.VertexMetrics, error) {
if rspn, err := dc.client.GetVertexMetrics(ctx, &daemon.GetVertexMetricsRequest{
Pipeline: &pipeline,
Vertex: &vertex,
Pods: &pods,
}); err != nil {
return nil, err
} else {
return rspn, nil
return rspn.VertexMetrics, nil
}
}

Expand Down
34 changes: 7 additions & 27 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
Name: vertexName,
},
}
podNum := req.GetPods()

processingRatesAvg := make(map[string]float64, 0)
pendingsAvg := make(map[string]int64, 0)
podNum := int64(1)
// for now only reduce has parallelism might have to modify later
obj := ps.pipeline.GetFromEdges(req.GetVertex())
if len(obj) > 0 && obj[0].Parallelism != nil {
podNum = int64(*obj[0].Parallelism)
}

// Get the headless service name
headlessServiceName := vertex.GetHeadlessServiceName()
Expand Down Expand Up @@ -217,32 +219,10 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
ProcessingRates: processingRates,
Pendings: pendings,
}

for k, v := range processingRates {
processingRatesAvg[k] += v
}

for k, v := range pendings {
pendingsAvg[k] += v
}
}
}

for k, v := range processingRatesAvg {
processingRatesAvg[k] = v / float64(podNum)
}
for k, v := range pendingsAvg {
pendingsAvg[k] = v / podNum
}

v := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
ProcessingRates: processingRatesAvg,
Pendings: pendingsAvg,
}
resp.Vertex = v
resp.PodMetrics = metricsArr
resp.VertexMetrics = metricsArr
return resp, nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"
}

vertex := "cat"
pods := int64(1)

req := &daemon.GetVertexMetricsRequest{Vertex: &vertex, Pods: &pods}
req := &daemon.GetVertexMetricsRequest{Vertex: &vertex}

resp, err := pipelineMetricsQueryService.GetVertexMetrics(context.Background(), req)
assert.NoError(t, err)
Expand All @@ -119,14 +118,14 @@ vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"
processingRates["1m"] = 5.084745762711864
processingRates["5m"] = 4.894736842105263
processingRates["default"] = 4.894736842105263
assert.Equal(t, resp.Vertex.GetProcessingRates(), processingRates)
assert.Equal(t, resp.VertexMetrics[0].GetProcessingRates(), processingRates)

pendings := make(map[string]int64)
pendings["15m"] = 4
pendings["1m"] = 5
pendings["5m"] = 6
pendings["default"] = 7
assert.Equal(t, resp.Vertex.GetPendings(), pendings)
assert.Equal(t, resp.VertexMetrics[0].GetPendings(), pendings)
}

func TestGetBuffer(t *testing.T) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
)

// TODO - Write Unit Tests for this file
Expand Down Expand Up @@ -123,15 +124,14 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
timeZero := time.Unix(0, 0).UnixMilli()
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
var podWatermarks []int64
for k := range ps.watermarkFetchers {
vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
PodWatermarks: podWatermarks,
PodWatermarks: nil,
}
i++
}
Expand All @@ -144,19 +144,20 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
i := 0
for k, vertexFetchers := range ps.watermarkFetchers {
var latestWatermark = int64(-1)
var podWatermarks []int64
var latestpodWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
watermark := fetcher.GetHeadWatermark().UnixMilli()
podWatermark := fetcher.GetPodWatermarks()
if watermark > latestWatermark {
latestWatermark = watermark
podWatermarks = nil
for _, v := range podWatermark {
podWatermarks = append(podWatermarks, v.UnixMilli())
}
latestpodWatermarks = podWatermark
}
}
vertexName := k
var podWatermarks []int64
for _, v := range latestpodWatermarks {
podWatermarks = append(podWatermarks, v.UnixMilli())
}
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Expand Down
29 changes: 23 additions & 6 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -141,12 +140,11 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log := logging.FromContext(ctx).With("worker", fmt.Sprint(worker)).With("vertexKey", key)
log.Debugf("Working on key: %s", key)
strs := strings.Split(key, "/")
if len(strs) != 3 {
if len(strs) != 2 {
return fmt.Errorf("invalid key %q", key)
}
namespace := strs[0]
vertexFullName := strs[1]
pods, _ := strconv.ParseInt(strs[2], 10, 64)
vertex := &dfv1.Vertex{}
if err := s.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: vertexFullName}, vertex); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -212,17 +210,36 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
defer func() {
_ = dClient.Close()
}()
vMetrics, err := dClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name, pods)
vMetrics, err := dClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
}
// Avg rate and pending for autoscaling are both in the map with key "default", see "pkg/metrics/metrics.go".
rate, existing := vMetrics.Vertex.ProcessingRates["default"]
var rate float64
existing := true
// looping over all pods to perform summation of rate values
// also marking as non-existent even when missing in any one pod
for _, v := range vMetrics {
val, exist := v.ProcessingRates["default"]
if !exist {
existing = false
}
rate += val
}
if !existing || rate < 0 || rate == isb.RateNotAvailable { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
}
pending, existing := vMetrics.Vertex.Pendings["default"]

var pending int64
existing = true
for _, v := range vMetrics {
val, exist := v.Pendings["default"]
if !exist {
existing = false
}
pending += val
}
if !existing || pending < 0 || pending == isb.PendingNotAvailable {
// Pending not available, we don't do anything
log.Debugf("Vertex %s has no pending messages information, skip scaling", vertex.Name)
Expand Down
3 changes: 1 addition & 2 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func (h *handler) GetVertexMetrics(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
vertex := c.Param("vertex")
pods, _ := strconv.ParseInt(c.Param("podnum"), 10, 64)
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
Expand All @@ -294,7 +293,7 @@ func (h *handler) GetVertexMetrics(c *gin.Context) {
defer func() {
_ = client.Close()
}()
l, err := client.GetVertexMetrics(context.Background(), pipeline, vertex, pods)
l, err := client.GetVertexMetrics(context.Background(), pipeline, vertex)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func v1Routes(r gin.IRouter) {
r.GET("/metrics/namespaces/:namespace/pods/:pod", handler.GetPodMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges", handler.ListPipelineEdges)
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/pods/:podnum/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/watermark", handler.GetVertexWatermark)
r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks)
}
6 changes: 3 additions & 3 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() {
Status(200).Body().Contains("pipeline")

HTTPExpect(s.T(), "https://localhost:1234").
GET(fmt.Sprintf("/api/v1/pipelines/%s/vertices/%s/pods/%v/metrics", pipelineName, "p1", 1)).
GET(fmt.Sprintf("/api/v1/pipelines/%s/vertices/%s/metrics", pipelineName, "p1")).
Expect().
Status(200).Body().Contains("pipeline")

Expand All @@ -97,9 +97,9 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() {
bufferInfo, err := client.GetPipelineBuffer(context.Background(), pipelineName, dfv1.GenerateEdgeBufferNames(Namespace, pipelineName, dfv1.Edge{From: "input", To: "p1"})[0])
assert.NoError(s.T(), err)
assert.Equal(s.T(), "input", *bufferInfo.FromVertex)
m, err := client.GetVertexMetrics(context.Background(), pipelineName, "p1", 1)
m, err := client.GetVertexMetrics(context.Background(), pipelineName, "p1")
assert.NoError(s.T(), err)
assert.Equal(s.T(), pipelineName, *m.Vertex.Pipeline)
assert.Equal(s.T(), pipelineName, *m[0].Pipeline)
}

func (s *FunctionalSuite) TestFiltering() {
Expand Down
40 changes: 22 additions & 18 deletions ui/src/components/pipeline/Pipeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -77,38 +77,42 @@ export function Pipeline() {
const getMetrics = useCallback(() => {
const vertexToMetricsMap = new Map();

if (pipeline?.spec?.vertices && vertexPods) {
if (pipeline?.spec?.vertices) {
Promise.all(
pipeline?.spec?.vertices.map((vertex) => {
return fetch(
`/api/v1/namespaces/${namespaceId}/pipelines/${pipelineId}/vertices/${vertex.name}/pods/${vertexPods.get(vertex.name)}/metrics`
`/api/v1/namespaces/${namespaceId}/pipelines/${pipelineId}/vertices/${vertex.name}/metrics`
)
.then((response) => response.json())
.then((json) => {
const vertexMetrics = {ratePerMin: 0, ratePerFiveMin: 0, ratePerFifteenMin: 0, podMetrics: null} as VertexMetrics;
if ("processingRates" in json.vertex) {
if ("1m" in json.vertex["processingRates"]) {
vertexMetrics.ratePerMin =
json.vertex["processingRates"]["1m"].toFixed(2);
}
if ("5m" in json.vertex["processingRates"]) {
vertexMetrics.ratePerFiveMin =
json.vertex["processingRates"]["5m"].toFixed(2);
const vertexMetrics = {ratePerMin: "0.00", ratePerFiveMin: "0.00", ratePerFifteenMin: "0.00", podMetrics: null} as VertexMetrics;
let ratePerMin = 0.0, ratePerFiveMin = 0.0, ratePerFifteenMin = 0.0;
// keeping processing rates as summation of pod values
json.map((pod) => {
if ("processingRates" in pod) {
if ("1m" in pod["processingRates"]) {
ratePerMin += pod["processingRates"]["1m"];
}
if ("5m" in pod["processingRates"]) {
ratePerFiveMin += pod["processingRates"]["5m"];
}
if ("15m" in pod["processingRates"]) {
ratePerFifteenMin += pod["processingRates"]["15m"];
}
}
if ("15m" in json.vertex["processingRates"]) {
vertexMetrics.ratePerFifteenMin =
json.vertex["processingRates"]["15m"].toFixed(2);
}
}
vertexMetrics.podMetrics = json.podMetrics
})
vertexMetrics.ratePerMin = ratePerMin.toFixed(2);
vertexMetrics.ratePerFiveMin = ratePerFiveMin.toFixed(2);
vertexMetrics.ratePerFifteenMin = ratePerFifteenMin.toFixed(2);
vertexMetrics.podMetrics = json;
vertexToMetricsMap.set(vertex.name, vertexMetrics);
});
})
)
.then(() => setVertexMetrics(vertexToMetricsMap))
.catch(console.error);
}
}, [pipeline, vertexPods]);
}, [pipeline]);

// This useEffect is used to obtain metrics for a given vertex in a pipeline and refreshes every 5 minutes
useEffect(() => {
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/pipeline/edgeinfo/EdgeInfo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export default function EdgeInfo(props: EdgeInfoProps) {
<Table aria-label="edge-info">
<TableHead>
<TableRow>
<TableCell>Edge</TableCell>
<TableCell >Edge</TableCell>
<TableCell >isFull</TableCell>
<TableCell >AckPending</TableCell>
<TableCell >Pending</TableCell>
Expand Down
Loading

0 comments on commit 848e256

Please sign in to comment.