Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add API for pipeline status check. Fixes #407. #599

Merged
merged 13 commits into from
Mar 15, 2023
823 changes: 749 additions & 74 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ message VertexMetrics {
map<string, int64> pendings = 4;
}

// PipelineStatus
message PipelineStatus {
required string status = 1;
required string message = 2;
}

message ListBuffersRequest {
required string pipeline = 1;
}
Expand All @@ -64,6 +70,14 @@ message GetBufferResponse {
required BufferInfo buffer = 1;
}

message GetPipelineStatusRequest {
required string pipeline = 1;
}

message GetPipelineStatusResponse {
required PipelineStatus status = 1;
}

message GetVertexMetricsRequest {
required string pipeline = 2;
required string vertex = 3;
Expand Down Expand Up @@ -110,4 +124,8 @@ service DaemonService {
rpc GetPipelineWatermarks (GetPipelineWatermarksRequest) returns (GetPipelineWatermarksResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/watermarks";
};

rpc GetPipelineStatus (GetPipelineStatusRequest) returns (GetPipelineStatusResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/status";
};
}
10 changes: 10 additions & 0 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,13 @@ func (dc *DaemonClient) GetPipelineWatermarks(ctx context.Context, pipeline stri
return rspn.PipelineWatermarks, nil
}
}

func (dc *DaemonClient) GetPipelineStatus(ctx context.Context, pipeline string) (*daemon.PipelineStatus, error) {
if rspn, err := dc.client.GetPipelineStatus(ctx, &daemon.GetPipelineStatusRequest{
Pipeline: &pipeline,
}); err != nil {
return nil, err
} else {
return rspn.Status, nil
}
}
57 changes: 57 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type pipelineMetadataQuery struct {
watermarkFetchers map[string][]fetch.Fetcher
}

const (
PipelineStatusOK = "OK"
PipelineStatusError = "Error"
PipelineStatusUnknown = "Unknown"
)

// NewPipelineMetadataQuery returns a new instance of pipelineMetadataQuery
func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline, wmFetchers map[string][]fetch.Fetcher) (*pipelineMetadataQuery, error) {
var err error
Expand Down Expand Up @@ -233,6 +239,57 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
return resp, nil
}

func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *daemon.GetPipelineStatusRequest) (*daemon.GetPipelineStatusResponse, error) {

resp := new(daemon.GetPipelineStatusResponse)

// get all vertices of pipeline
vertices := ps.pipeline.Spec.Vertices

// loop over vertices and get metrics to check pending messages vs processing rate
for _, vertex := range vertices {
vertexReq := new(daemon.GetVertexMetricsRequest)
vertexReq.Vertex = &vertex.Name
vertexResp, err := ps.GetVertexMetrics(ctx, vertexReq)
// if err is not nil, more than likely autoscaling is down to 0 and metrics are not available
if err != nil {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusUnknown),
Message: pointer.String("Pipeline status is unknown."),
}
return resp, nil
}

// may need to revisit later, another concern could be that the processing rate is too slow instead of just 0
for _, vertexMetrics := range vertexResp.VertexMetrics {
if pending, ok := vertexMetrics.GetPendings()["default"]; ok {
if processingRate, ok := vertexMetrics.GetProcessingRates()["default"]; ok {
if pending > 0 && processingRate == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments, that we need to revisit it later, it might also be caused by the processing is very slow, that the rate is 0.

resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
}
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this else.

It is expected sometimes there's no pending information:

  1. Source like HTTP, it does not have pending at all;
  2. Based on the GetVertexMetrics implementation, when scaling down to 0 replica, the pending is also not available.

For these cases, we should consider it's ok.

resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
}
}
}

resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusOK),
Message: pointer.String("Pipeline has no issue."),
}

return resp, nil
}

func getBufferLimits(pl *v1alpha1.Pipeline, edge v1alpha1.Edge) (bufferLength int64, bufferUsageLimit float64) {
plLimits := pl.GetPipelineLimits()
bufferLength = int64(*plLimits.BufferMaxLength)
Expand Down
81 changes: 81 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
Expand Down Expand Up @@ -203,3 +204,83 @@ func TestListBuffers(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(resp.Buffers), 2)
}

func TestGetPipelineStatus(t *testing.T) {
pipelineName := "simple-pipeline"
pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: pipelineName,
},
Spec: v1alpha1.PipelineSpec{
Vertices: []v1alpha1.AbstractVertex{
{Name: "cat"},
},
},
}
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline, nil)
assert.NoError(t, err)

OKPipelineResponse := daemon.PipelineStatus{Status: pointer.String("OK"), Message: pointer.String("Pipeline has no issue.")}
ErrorPipelineResponse := daemon.PipelineStatus{Status: pointer.String("Error"), Message: pointer.String("Pipeline has an error. Vertex cat is not processing pending messages.")}

metricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
vertex_processing_rate{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
vertex_processing_rate{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.084745762711864
vertex_processing_rate{period="5m",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
vertex_processing_rate{period="default",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263

# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader := io.NopCloser(bytes.NewReader([]byte(metricsResponse)))

pipelineMetricsQueryService.httpClient = &mockHttpClient{
MockGet: func(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: ioReader,
}, nil
},
}

req := &daemon.GetPipelineStatusRequest{Pipeline: &pipelineName}

resp, err := pipelineMetricsQueryService.GetPipelineStatus(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, &OKPipelineResponse, resp.Status)

errorMetricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
vertex_processing_rate{period="15m",pipeline="simple-pipeline",vertex="cat"} 0
vertex_processing_rate{period="1m",pipeline="simple-pipeline",vertex="cat"} 0
vertex_processing_rate{period="5m",pipeline="simple-pipeline",vertex="cat"} 0
vertex_processing_rate{period="default",pipeline="simple-pipeline",vertex="cat"} 0

# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader = io.NopCloser(bytes.NewReader([]byte(errorMetricsResponse)))

pipelineMetricsQueryService.httpClient = &mockHttpClient{
MockGet: func(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: ioReader,
}, nil
},
}

resp, err = pipelineMetricsQueryService.GetPipelineStatus(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, &ErrorPipelineResponse, resp.Status)
}
20 changes: 20 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,26 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) {
c.JSON(http.StatusOK, l)
}

// GetPipelineStatus is used to provide status check for a given pipeline
func (h *handler) GetPipelineStatus(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.GetPipelineStatus(context.Background(), pipeline)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, l)
}

func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}
1 change: 1 addition & 0 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ func v1Routes(r gin.IRouter) {
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks)
r.GET("/namespaces/:namespace/pipelines/:pipeline/status", handler.GetPipelineStatus)
}