Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add API for pipeline status check. Fixes #407. #599

Merged
merged 13 commits into from
Mar 15, 2023
810 changes: 739 additions & 71 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ message VertexMetrics {
map<string, int64> pendings = 4;
}

// PipelineStatus
message PipelineStatus {
required bool health = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should go with a string type status, which has more meanings other than only true/false. For example, OK/Unknown/Error/...

required string message = 2;
}

message ListBuffersRequest {
required string pipeline = 1;
}
Expand All @@ -64,6 +70,14 @@ message GetBufferResponse {
required BufferInfo buffer = 1;
}

message GetPipelineStatusRequest {
required string pipeline = 1;
}

message GetPipelineStatusResponse {
required PipelineStatus status = 1;
}

message GetVertexMetricsRequest {
required string pipeline = 2;
required string vertex = 3;
Expand Down Expand Up @@ -110,4 +124,9 @@ service DaemonService {
rpc GetPipelineWatermarks (GetPipelineWatermarksRequest) returns (GetPipelineWatermarksResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/watermarks";
};

// GetPipelineStatus () returns (PipelineStatus) // status: bool and string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is not up to date.

rpc GetPipelineStatus (GetPipelineStatusRequest) returns (GetPipelineStatusResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/status";
};
}
10 changes: 10 additions & 0 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,13 @@ func (dc *DaemonClient) GetPipelineWatermarks(ctx context.Context, pipeline stri
return rspn.PipelineWatermarks, nil
}
}

func (dc *DaemonClient) GetPipelineStatus(ctx context.Context, pipeline string) (*daemon.PipelineStatus, error) {
if rspn, err := dc.client.GetPipelineStatus(ctx, &daemon.GetPipelineStatusRequest{
Pipeline: &pipeline,
}); err != nil {
return nil, err
} else {
return rspn.Status, nil
}
}
34 changes: 34 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,40 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
return resp, nil
}

func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *daemon.GetPipelineStatusRequest) (*daemon.GetPipelineStatusResponse, error) {

resp := new(daemon.GetPipelineStatusResponse)

// get all vertices of pipeline
vertices := ps.pipeline.Spec.Vertices

// loop over vertices and get metrics to check pending messages vs processing rate
for _, vertex := range vertices {
vertexReq := new(daemon.GetVertexMetricsRequest)
vertexReq.Pipeline = req.Pipeline
vertexReq.Vertex = &vertex.Name
vertexResp, err := ps.GetVertexMetrics(ctx, vertexReq)
if err != nil {
return nil, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If err is not nil, should not return an error, it probably just because autoscaling down to 0 and then metrics are not available. In this case, we could just return the status as Unknown.

}
// check default pending msg and processing rates
if vertexResp.VertexMetrics[0].Pendings["default"] > 0 && vertexResp.VertexMetrics[0].ProcessingRates["default"] == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not only check [0], reduce vertex has more items in the array.

Based on the GetVertexMetrics() implementation, default is not always available in the map, so also need to check if it's existing.

resp.Status = &daemon.PipelineStatus{
Health: pointer.Bool(false),
Message: pointer.String("Pipeline may have issue."),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More information in the message, such as which vertex has what issue.

}
return resp, nil
}
}

resp.Status = &daemon.PipelineStatus{
Health: pointer.Bool(true),
Message: pointer.String("Pipeline has no issue."),
}
return resp, nil

}

func getBufferLimits(pl *v1alpha1.Pipeline, edge v1alpha1.Edge) (bufferLength int64, bufferUsageLimit float64) {
plLimits := pl.GetPipelineLimits()
bufferLength = int64(*plLimits.BufferMaxLength)
Expand Down
44 changes: 44 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
Expand Down Expand Up @@ -203,3 +204,46 @@ func TestListBuffers(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(resp.Buffers), 2)
}

func TestGetPipelineStatus(t *testing.T) {
pipelineName := "simple-pipeline"
pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: pipelineName},
}
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline, nil)
assert.NoError(t, err)

metricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
vertex_processing_rate{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
vertex_processing_rate{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.084745762711864
vertex_processing_rate{period="5m",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
vertex_processing_rate{period="default",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263

# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader := io.NopCloser(bytes.NewReader([]byte(metricsResponse)))

pipelineMetricsQueryService.httpClient = &mockHttpClient{
MockGet: func(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: ioReader,
}, nil
},
}

healthyPipelineResponse := daemon.PipelineStatus{Health: pointer.Bool(true), Message: pointer.String("Pipeline has no issue.")}

req := &daemon.GetPipelineStatusRequest{Pipeline: &pipelineName}

resp, err := pipelineMetricsQueryService.GetPipelineStatus(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, &healthyPipelineResponse, resp.Status)
}
20 changes: 20 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,26 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) {
c.JSON(http.StatusOK, l)
}

// GetPipelineStatus is used to provide status check for a given pipeline
func (h *handler) GetPipelineStatus(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.GetPipelineStatus(context.Background(), pipeline)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, l)
}

func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}
1 change: 1 addition & 0 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ func v1Routes(r gin.IRouter) {
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks)
r.GET("/namespaces/:namespace/pipelines/:pipeline/status", handler.GetPipelineStatus)
}