Skip to content

Commit

Permalink
feat: add API for pipeline status check. Fixes #407. (#599)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <[email protected]>
  • Loading branch information
dpadhiar authored Mar 15, 2023
1 parent 51903be commit c981513
Show file tree
Hide file tree
Showing 8 changed files with 1,031 additions and 74 deletions.
823 changes: 749 additions & 74 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ message VertexMetrics {
map<string, int64> pendings = 4;
}

// PipelineStatus
message PipelineStatus {
required string status = 1;
required string message = 2;
}

message ListBuffersRequest {
required string pipeline = 1;
}
Expand All @@ -64,6 +70,14 @@ message GetBufferResponse {
required BufferInfo buffer = 1;
}

message GetPipelineStatusRequest {
required string pipeline = 1;
}

message GetPipelineStatusResponse {
required PipelineStatus status = 1;
}

message GetVertexMetricsRequest {
required string pipeline = 2;
required string vertex = 3;
Expand Down Expand Up @@ -110,4 +124,8 @@ service DaemonService {
rpc GetPipelineWatermarks (GetPipelineWatermarksRequest) returns (GetPipelineWatermarksResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/watermarks";
};

rpc GetPipelineStatus (GetPipelineStatusRequest) returns (GetPipelineStatusResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/status";
};
}
10 changes: 10 additions & 0 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,13 @@ func (dc *DaemonClient) GetPipelineWatermarks(ctx context.Context, pipeline stri
return rspn.PipelineWatermarks, nil
}
}

func (dc *DaemonClient) GetPipelineStatus(ctx context.Context, pipeline string) (*daemon.PipelineStatus, error) {
if rspn, err := dc.client.GetPipelineStatus(ctx, &daemon.GetPipelineStatusRequest{
Pipeline: &pipeline,
}); err != nil {
return nil, err
} else {
return rspn.Status, nil
}
}
51 changes: 51 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type pipelineMetadataQuery struct {
watermarkFetchers map[string][]fetch.Fetcher
}

const (
PipelineStatusOK = "OK"
PipelineStatusError = "Error"
PipelineStatusUnknown = "Unknown"
)

// NewPipelineMetadataQuery returns a new instance of pipelineMetadataQuery
func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline, wmFetchers map[string][]fetch.Fetcher) (*pipelineMetadataQuery, error) {
var err error
Expand Down Expand Up @@ -233,6 +239,51 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
return resp, nil
}

func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *daemon.GetPipelineStatusRequest) (*daemon.GetPipelineStatusResponse, error) {

resp := new(daemon.GetPipelineStatusResponse)

// get all vertices of pipeline
vertices := ps.pipeline.Spec.Vertices

// loop over vertices and get metrics to check pending messages vs processing rate
for _, vertex := range vertices {
vertexReq := new(daemon.GetVertexMetricsRequest)
vertexReq.Vertex = &vertex.Name
vertexResp, err := ps.GetVertexMetrics(ctx, vertexReq)
// if err is not nil, more than likely autoscaling is down to 0 and metrics are not available
if err != nil {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusUnknown),
Message: pointer.String("Pipeline status is unknown."),
}
return resp, nil
}

// may need to revisit later, another concern could be that the processing rate is too slow instead of just 0
for _, vertexMetrics := range vertexResp.VertexMetrics {
if pending, ok := vertexMetrics.GetPendings()["default"]; ok {
if processingRate, ok := vertexMetrics.GetProcessingRates()["default"]; ok {
if pending > 0 && processingRate == 0 {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
}
}
}
}
}

resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusOK),
Message: pointer.String("Pipeline has no issue."),
}

return resp, nil
}

func getBufferLimits(pl *v1alpha1.Pipeline, edge v1alpha1.Edge) (bufferLength int64, bufferUsageLimit float64) {
plLimits := pl.GetPipelineLimits()
bufferLength = int64(*plLimits.BufferMaxLength)
Expand Down
81 changes: 81 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
Expand Down Expand Up @@ -203,3 +204,83 @@ func TestListBuffers(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(resp.Buffers), 2)
}

func TestGetPipelineStatus(t *testing.T) {
pipelineName := "simple-pipeline"
pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: pipelineName,
},
Spec: v1alpha1.PipelineSpec{
Vertices: []v1alpha1.AbstractVertex{
{Name: "cat"},
},
},
}
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline, nil)
assert.NoError(t, err)

OKPipelineResponse := daemon.PipelineStatus{Status: pointer.String("OK"), Message: pointer.String("Pipeline has no issue.")}
ErrorPipelineResponse := daemon.PipelineStatus{Status: pointer.String("Error"), Message: pointer.String("Pipeline has an error. Vertex cat is not processing pending messages.")}

metricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
vertex_processing_rate{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
vertex_processing_rate{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.084745762711864
vertex_processing_rate{period="5m",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
vertex_processing_rate{period="default",pipeline="simple-pipeline",vertex="cat"} 4.894736842105263
# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader := io.NopCloser(bytes.NewReader([]byte(metricsResponse)))

pipelineMetricsQueryService.httpClient = &mockHttpClient{
MockGet: func(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: ioReader,
}, nil
},
}

req := &daemon.GetPipelineStatusRequest{Pipeline: &pipelineName}

resp, err := pipelineMetricsQueryService.GetPipelineStatus(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, &OKPipelineResponse, resp.Status)

errorMetricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
vertex_processing_rate{period="15m",pipeline="simple-pipeline",vertex="cat"} 0
vertex_processing_rate{period="1m",pipeline="simple-pipeline",vertex="cat"} 0
vertex_processing_rate{period="5m",pipeline="simple-pipeline",vertex="cat"} 0
vertex_processing_rate{period="default",pipeline="simple-pipeline",vertex="cat"} 0
# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader = io.NopCloser(bytes.NewReader([]byte(errorMetricsResponse)))

pipelineMetricsQueryService.httpClient = &mockHttpClient{
MockGet: func(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: ioReader,
}, nil
},
}

resp, err = pipelineMetricsQueryService.GetPipelineStatus(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, &ErrorPipelineResponse, resp.Status)
}
20 changes: 20 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,26 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) {
c.JSON(http.StatusOK, l)
}

// GetPipelineStatus is used to provide status check for a given pipeline
func (h *handler) GetPipelineStatus(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.GetPipelineStatus(context.Background(), pipeline)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, l)
}

func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}
1 change: 1 addition & 0 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ func v1Routes(r gin.IRouter) {
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks)
r.GET("/namespaces/:namespace/pipelines/:pipeline/status", handler.GetPipelineStatus)
}

0 comments on commit c981513

Please sign in to comment.