Skip to content

Commit

Permalink
fixing testcases
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g committed Feb 2, 2023
1 parent 977b495 commit b62fda8
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 279 deletions.
230 changes: 88 additions & 142 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ message GetVertexMetricsResponse {
message VertexWatermark {
required string pipeline = 1;
required string vertex = 2;
repeated int64 watermarks = 3;
required int64 watermark = 3;
required bool isWatermarkEnabled = 4;
}

Expand Down
79 changes: 14 additions & 65 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ package service
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
)

// TODO - Write Unit Tests for this file
Expand All @@ -44,17 +40,13 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline
}

for _, vertex := range pipeline.Spec.Vertices {
// key for fetcher map ~ vertexName/replicas
replicas := int64(1)
if vertex.Sink != nil {
toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipeline.Name, vertex.Name)
wmFetcher, err := isbSvcClient.CreateWatermarkFetcher(ctx, toBufferName)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher, %w", err)
}
// for now only reduce has parallelism so not updating replicas for sink for now as done below
fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10)
wmFetchers[fetchersKey] = []fetch.Fetcher{wmFetcher}
wmFetchers[vertex.Name] = []fetch.Fetcher{wmFetcher}
} else {
// If the vertex is not a sink, to fetch the watermark, we consult all out edges and grab the latest watermark among them.
var wmFetcherList []fetch.Fetcher
Expand All @@ -68,16 +60,7 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline
wmFetcherList = append(wmFetcherList, fetchWatermark)
}
}
// for now only reduce has parallelism might have to modify later
// checking parallelism for a vertex to identify reduce vertex
// replicas will have parallelism for reduce vertex else will be nil
// parallelism indicates replica count ~ multiple pods for a vertex here
obj := pipeline.GetFromEdges(vertex.Name)
if len(obj) > 0 && obj[0].Parallelism != nil {
replicas = int64(*obj[0].Parallelism)
}
fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10)
wmFetchers[fetchersKey] = wmFetcherList
wmFetchers[vertex.Name] = wmFetcherList
}
}
return wmFetchers, nil
Expand All @@ -89,36 +72,22 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
resp := new(daemon.GetVertexWatermarkResponse)
vertexName := request.GetVertex()
isWatermarkEnabled := !ps.pipeline.Spec.Watermark.Disabled
// for now only reduce has parallelism might have to modify later
// checking parallelism for a vertex to identify reduce vertex
// parallelism is only supported by reduce vertex for now else will be nil
// parallelism indicates replica count ~ multiple pods for a vertex here
replicas := int64(1)
obj := ps.pipeline.GetFromEdges(vertexName)
if len(obj) > 0 && obj[0].Parallelism != nil {
replicas = int64(*obj[0].Parallelism)
}

// If watermark is not enabled, return time zero
if ps.pipeline.Spec.Watermark.Disabled {
timeZero := time.Unix(0, 0).UnixMilli()
watermarks := make([]int64, replicas)
for idx := range watermarks {
watermarks[idx] = timeZero
}
v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Watermarks: watermarks,
Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
return resp, nil
}

// Watermark is enabled
fetchersKey := vertexName + "/" + strconv.FormatInt(replicas, 10)
vertexFetchers, ok := ps.watermarkFetchers[fetchersKey]
vertexFetchers, ok := ps.watermarkFetchers[vertexName]

// Vertex not found
if !ok {
Expand All @@ -127,24 +96,17 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
}

var latestWatermark = int64(-1)
var latestWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
watermarks := fetcher.GetHeadWatermarks()
sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() })
watermark := watermarks[0].UnixMilli()
if watermark >= latestWatermark {
watermark := fetcher.GetHeadWatermark().UnixMilli()
if watermark > latestWatermark {
latestWatermark = watermark
latestWatermarks = watermarks
}
}
var watermarks []int64
for _, v := range latestWatermarks {
watermarks = append(watermarks, v.UnixMilli())
}

v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Watermarks: watermarks,
Watermark: &latestWatermark,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
Expand All @@ -162,16 +124,11 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
for k := range ps.watermarkFetchers {
vertexName := strings.Split(k, "/")[0]
replicas, _ := strconv.ParseInt(strings.Split(k, "/")[1], 10, 64)
watermarks := make([]int64, replicas)
for idx := range watermarks {
watermarks[idx] = timeZero
}
vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermarks: watermarks,
Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
}
i++
Expand All @@ -185,25 +142,17 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
i := 0
for k, vertexFetchers := range ps.watermarkFetchers {
var latestWatermark = int64(-1)
var latestWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
watermarks := fetcher.GetHeadWatermarks()
sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() })
watermark := watermarks[0].UnixMilli()
if watermark >= latestWatermark {
watermark := fetcher.GetHeadWatermark().UnixMilli()
if watermark > latestWatermark {
latestWatermark = watermark
latestWatermarks = watermarks
}
}
vertexName := strings.Split(k, "/")[0]
var watermarks []int64
for _, v := range latestWatermarks {
watermarks = append(watermarks, v.UnixMilli())
}
vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermarks: watermarks,
Watermark: &latestWatermark,
IsWatermarkEnabled: &isWatermarkEnabled,
}
i++
Expand Down
4 changes: 2 additions & 2 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (e *EventTypeWMProgressor) GetWatermark(offset isb.Offset) processor.Waterm
return e.watermarks[offset.String()]
}

func (e *EventTypeWMProgressor) GetHeadWatermarks() []processor.Watermark {
return []processor.Watermark{}
func (e *EventTypeWMProgressor) GetHeadWatermark() processor.Watermark {
return processor.Watermark{}
}

// PayloadForTest is a dummy payload for testing.
Expand Down
24 changes: 12 additions & 12 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,37 +53,37 @@ func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.W
}
}

// GetHeadWatermarks returns the watermark using the HeadOffset (the latest offset among all processors). This
// GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This
// can be used in showing the watermark progression for a vertex when not consuming the messages
// directly (eg. UX, tests)
// NOTE
// - We don't use this function in the regular pods in the vertex.
// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens.
// Meaning, in the UX (daemon service) we never delete any processor.
func (e *edgeFetcher) GetHeadWatermarks() []processor.Watermark {
func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
var debugString strings.Builder
var headWatermarks []processor.Watermark
var headOffset int64 = math.MinInt64
var epoch int64 = math.MaxInt64
var allProcessors = e.processorManager.GetAllProcessors()
// get the head offset of each processor
for _, p := range allProcessors {
if !p.IsActive() {
continue
}
var o = p.offsetTimeline.GetHeadOffset()
var epoch int64 = math.MaxInt64
e.log.Debugf("Processor: %v (headoffset:%d)", p, o)
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o))
if o != -1 {
if o != -1 && o > headOffset {
headOffset = o
epoch = p.offsetTimeline.GetEventTimeFromInt64(o)
}
if epoch == math.MaxInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1)))
} else {
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch)))
}
}
return headWatermarks
e.log.Debugf("GetHeadWatermark: %s", debugString.String())
if epoch == math.MaxInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
return processor.Watermark(time.UnixMilli(-1))
}
return processor.Watermark(time.UnixMilli(epoch))
}

// GetWatermark gets the smallest timestamp for the given offset
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestBuffer_GetWatermark(t *testing.T) {
t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.UnixMilli(tt.want)))
}
// this will always be 14 because the timeline has been populated ahead of time
assert.Equal(t, time.Time(b.GetHeadWatermarks()[len(b.GetHeadWatermarks())-1]).In(location), time.UnixMilli(14).In(location))
assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.UnixMilli(14).In(location))
})
}
}
4 changes: 2 additions & 2 deletions pkg/watermark/fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ type Fetcher interface {
io.Closer
// GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1.
GetWatermark(offset isb.Offset) processor.Watermark
// GetHeadWatermarks returns the latest watermark based on the head offset
GetHeadWatermarks() []processor.Watermark
// GetHeadWatermark returns the latest watermark based on the head offset
GetHeadWatermark() processor.Watermark
}
21 changes: 9 additions & 12 deletions pkg/watermark/fetch/source_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,22 @@ func NewSourceFetcher(ctx context.Context, sourceBufferName string, storeWatcher
}
}

// GetHeadWatermarks returns the latest watermark of all the processors.
func (e *sourceFetcher) GetHeadWatermarks() []processor.Watermark {
var headWatermarks []processor.Watermark
// GetHeadWatermark returns the latest watermark of all the processors.
func (e *sourceFetcher) GetHeadWatermark() processor.Watermark {
var epoch int64 = math.MinInt64
for _, p := range e.processorManager.GetAllProcessors() {
if !p.IsActive() {
continue
}
var epoch int64 = math.MinInt64
if p.offsetTimeline.GetHeadWatermark() != -1 {
if p.offsetTimeline.GetHeadWatermark() > epoch {
epoch = p.offsetTimeline.GetHeadWatermark()
}
if epoch == math.MinInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1)))
} else {
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch)))
}
}
return headWatermarks
if epoch == math.MinInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
return processor.Watermark(time.UnixMilli(-1))
}
return processor.Watermark(time.UnixMilli(epoch))
}

// GetWatermark returns the lowest of the latest watermark of all the processors,
Expand Down
6 changes: 3 additions & 3 deletions pkg/watermark/generic/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func (n NoOpWMProgressor) GetLatestWatermark() processor.Watermark {
return processor.Watermark{}
}

// GetHeadWatermarks returns the default head watermark.
func (n NoOpWMProgressor) GetHeadWatermarks() []processor.Watermark {
return []processor.Watermark{}
// GetHeadWatermark returns the default head watermark.
func (n NoOpWMProgressor) GetHeadWatermark() processor.Watermark {
return processor.Watermark{}
}

// Close stops the no-op progressor.
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClie
pipelineWatermarks := make([]int64, len(vertexList))
idx := 0
for _, v := range wm {
pipelineWatermarks[idx] = v.Watermarks[0]
pipelineWatermarks[idx] = *v.Watermark
idx++
}
currentWatermark = pipelineWatermarks
Expand Down
3 changes: 1 addition & 2 deletions ui/src/components/pipeline/Pipeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,10 @@ export function Pipeline() {
json.map((vertex) => {
const vertexWatermark = {} as VertexWatermark;
vertexWatermark.isWaterMarkEnabled = vertex["isWatermarkEnabled"];
vertexWatermark.watermark = vertex["watermarks"][0];
vertexWatermark.watermark = vertex["watermark"];
vertexWatermark.watermarkLocalTime = new Date(
vertexWatermark.watermark
).toISOString();
vertexWatermark.podWatermarks = vertex["watermarks"];
vertexToWatermarkMap.set(vertex.vertex, vertexWatermark);
})
})
Expand Down
36 changes: 1 addition & 35 deletions ui/src/components/pipeline/nodeinfo/NodeInfo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,12 @@ export default function NodeInfo(props: NodeInfoProps) {
{...a11yProps(1)}
/>
)}
{node?.data?.vertexWatermark && (
<Tab
data-testid="watermarks"
style={{ fontWeight: "bold" }}
label="Watermarks"
{...a11yProps(2)}
/>
)}
{node?.data?.vertexMetrics && (
<Tab
data-testid="processing-rates"
style={{ fontWeight: "bold"}}
label="Processing Rates"
{...a11yProps(3)}
{...a11yProps(2)}
/>
)}
</Tabs>
Expand Down Expand Up @@ -124,32 +116,6 @@ export default function NodeInfo(props: NodeInfoProps) {
)}
</TabPanel>
<TabPanel value={value} index={2}>
{node?.data?.vertexWatermark && (
<TableContainer
component={Paper}
sx={{ borderBottom: 1, borderColor: "divider", width: 200 }}
>
<Table aria-label="pod-watermark">
<TableHead>
<TableRow>
<TableCell>Pod</TableCell>
<TableCell >Watermark</TableCell>
</TableRow>
</TableHead>
<TableBody>
{node?.data?.vertexWatermark?.podWatermarks &&
node.data.vertexWatermark.podWatermarks.map((podWatermark, idx) => (
<TableRow>
<TableCell >Pod - {idx}</TableCell>
<TableCell >{podWatermark}</TableCell>
</TableRow>
))}
</TableBody>
</Table>
</TableContainer>
)}
</TabPanel>
<TabPanel value={value} index={3}>
{node?.data?.vertexMetrics && (
<TableContainer
component={Paper}
Expand Down
1 change: 0 additions & 1 deletion ui/src/utils/models/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export interface VertexWatermark {
watermark: number;
watermarkLocalTime: string;
isWaterMarkEnabled: boolean;
podWatermarks: number[];
}

export interface EdgeInfo {
Expand Down

0 comments on commit b62fda8

Please sign in to comment.