Skip to content

Commit

Permalink
modifying Watermark object in proto file
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g committed Feb 1, 2023
1 parent b82730a commit c9e57d0
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 269 deletions.
241 changes: 99 additions & 142 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ message GetVertexMetricsResponse {
message VertexWatermark {
required string pipeline = 1;
required string vertex = 2;
required int64 watermark = 3;
repeated int64 watermarks = 3;
required bool isWatermarkEnabled = 4;
repeated int64 podWatermarks = 5;
}

message GetVertexWatermarkResponse {
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
}
podNum := int64(1)
// for now only reduce has parallelism might have to modify later
// checking parallelism for a vertex to identify reduce vertex
// replicas will have parallelism for reduce vertex else will be nil
// parallelism indicates replica count ~ multiple pods for a vertex here
obj := ps.pipeline.GetFromEdges(req.GetVertex())
if len(obj) > 0 && obj[0].Parallelism != nil {
podNum = int64(*obj[0].Parallelism)
Expand Down
85 changes: 63 additions & 22 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package service
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand All @@ -41,13 +44,17 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline
}

for _, vertex := range pipeline.Spec.Vertices {
// key for fetcher map ~ vertexName/replicas
replicas := int64(1)
if vertex.Sink != nil {
toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipeline.Name, vertex.Name)
wmFetcher, err := isbSvcClient.CreateWatermarkFetcher(ctx, toBufferName)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher, %w", err)
}
wmFetchers[vertex.Name] = []fetch.Fetcher{wmFetcher}
// for now only reduce has parallelism so not updating replicas for sink for now as done below
fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10)
wmFetchers[fetchersKey] = []fetch.Fetcher{wmFetcher}
} else {
// If the vertex is not a sink, to fetch the watermark, we consult all out edges and grab the latest watermark among them.
var wmFetcherList []fetch.Fetcher
Expand All @@ -61,7 +68,16 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline
wmFetcherList = append(wmFetcherList, fetchWatermark)
}
}
wmFetchers[vertex.Name] = wmFetcherList
// for now only reduce has parallelism might have to modify later
// checking parallelism for a vertex to identify reduce vertex
// replicas will have parallelism for reduce vertex else will be nil
// parallelism indicates replica count ~ multiple pods for a vertex here
obj := pipeline.GetFromEdges(vertex.Name)
if len(obj) > 0 && obj[0].Parallelism != nil {
replicas = int64(*obj[0].Parallelism)
}
fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10)
wmFetchers[fetchersKey] = wmFetcherList
}
}
return wmFetchers, nil
Expand All @@ -73,22 +89,36 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
resp := new(daemon.GetVertexWatermarkResponse)
vertexName := request.GetVertex()
isWatermarkEnabled := !ps.pipeline.Spec.Watermark.Disabled
// for now only reduce has parallelism might have to modify later
// checking parallelism for a vertex to identify reduce vertex
// parallelism is only supported by reduce vertex for now else will be nil
// parallelism indicates replica count ~ multiple pods for a vertex here
replicas := int64(1)
obj := ps.pipeline.GetFromEdges(vertexName)
if len(obj) > 0 && obj[0].Parallelism != nil {
replicas = int64(*obj[0].Parallelism)
}

// If watermark is not enabled, return time zero
if ps.pipeline.Spec.Watermark.Disabled {
timeZero := time.Unix(0, 0).UnixMilli()
watermarks := make([]int64, replicas)
for idx := range watermarks {
watermarks[idx] = timeZero
}
v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Watermark: &timeZero,
Watermarks: watermarks,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
return resp, nil
}

// Watermark is enabled
vertexFetchers, ok := ps.watermarkFetchers[vertexName]
fetchersKey := vertexName + "/" + strconv.FormatInt(replicas, 10)
vertexFetchers, ok := ps.watermarkFetchers[fetchersKey]

// Vertex not found
if !ok {
Expand All @@ -97,17 +127,24 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
}

var latestWatermark = int64(-1)
var latestWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
watermark := fetcher.GetHeadWatermark().UnixMilli()
if watermark > latestWatermark {
watermarks := fetcher.GetHeadWatermarks()
sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() })
watermark := watermarks[0].UnixMilli()
if watermark >= latestWatermark {
latestWatermark = watermark
latestWatermarks = watermarks
}
}

var watermarks []int64
for _, v := range latestWatermarks {
watermarks = append(watermarks, v.UnixMilli())
}
v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Watermark: &latestWatermark,
Watermarks: watermarks,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
Expand All @@ -125,13 +162,17 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
for k := range ps.watermarkFetchers {
vertexName := k
vertexName := strings.Split(k, "/")[0]
replicas, _ := strconv.ParseInt(strings.Split(k, "/")[1], 10, 64)
watermarks := make([]int64, replicas)
for idx := range watermarks {
watermarks[idx] = timeZero
}
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermark: &timeZero,
Watermarks: watermarks,
IsWatermarkEnabled: &isWatermarkEnabled,
PodWatermarks: nil,
}
i++
}
Expand All @@ -144,26 +185,26 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
i := 0
for k, vertexFetchers := range ps.watermarkFetchers {
var latestWatermark = int64(-1)
var latestpodWatermarks []processor.Watermark
var latestWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
watermark := fetcher.GetHeadWatermark().UnixMilli()
podWatermark := fetcher.GetPodWatermarks()
if watermark > latestWatermark {
watermarks := fetcher.GetHeadWatermarks()
sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() })
watermark := watermarks[0].UnixMilli()
if watermark >= latestWatermark {
latestWatermark = watermark
latestpodWatermarks = podWatermark
latestWatermarks = watermarks
}
}
vertexName := k
var podWatermarks []int64
for _, v := range latestpodWatermarks {
podWatermarks = append(podWatermarks, v.UnixMilli())
vertexName := strings.Split(k, "/")[0]
var watermarks []int64
for _, v := range latestWatermarks {
watermarks = append(watermarks, v.UnixMilli())
}
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermark: &latestWatermark,
Watermarks: watermarks,
IsWatermarkEnabled: &isWatermarkEnabled,
PodWatermarks: podWatermarks,
}
i++
}
Expand Down
23 changes: 2 additions & 21 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,31 +215,12 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
}
// Avg rate and pending for autoscaling are both in the map with key "default", see "pkg/metrics/metrics.go".
var rate float64
existing := true
// looping over all pods to perform summation of rate values
// also marking as non-existent even when missing in any one pod
for _, v := range vMetrics {
val, exist := v.ProcessingRates["default"]
if !exist {
existing = false
}
rate += val
}
rate, existing := vMetrics[0].ProcessingRates["default"]
if !existing || rate < 0 || rate == isb.RateNotAvailable { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
}

var pending int64
existing = true
for _, v := range vMetrics {
val, exist := v.Pendings["default"]
if !exist {
existing = false
}
pending += val
}
pending, existing := vMetrics[0].Pendings["default"]
if !existing || pending < 0 || pending == isb.PendingNotAvailable {
// Pending not available, we don't do anything
log.Debugf("Vertex %s has no pending messages information, skip scaling", vertex.Name)
Expand Down
6 changes: 1 addition & 5 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ func (e *EventTypeWMProgressor) GetWatermark(offset isb.Offset) processor.Waterm
return e.watermarks[offset.String()]
}

func (e *EventTypeWMProgressor) GetHeadWatermark() processor.Watermark {
return processor.Watermark{}
}

func (e *EventTypeWMProgressor) GetPodWatermarks() []processor.Watermark {
func (e *EventTypeWMProgressor) GetHeadWatermarks() []processor.Watermark {
return []processor.Watermark{}
}

Expand Down
38 changes: 6 additions & 32 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,44 +53,18 @@ func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.W
}
}

// GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This
// GetHeadWatermarks returns the watermark using the HeadOffset (the latest offset among all processors). This
// can be used in showing the watermark progression for a vertex when not consuming the messages
// directly (eg. UX, tests)
// NOTE
// - We don't use this function in the regular pods in the vertex.
// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens.
// Meaning, in the UX (daemon service) we never delete any processor.
func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
func (e *edgeFetcher) GetHeadWatermarks() []processor.Watermark {
var debugString strings.Builder
var headOffset int64 = math.MinInt64
var epoch int64 = math.MaxInt64
var headWatermarks []processor.Watermark
var allProcessors = e.processorManager.GetAllProcessors()
// get the head offset of each processor
for _, p := range allProcessors {
if !p.IsActive() {
continue
}
var o = p.offsetTimeline.GetHeadOffset()
e.log.Debugf("Processor: %v (headoffset:%d)", p, o)
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o))
if o != -1 && o > headOffset {
headOffset = o
epoch = p.offsetTimeline.GetEventTimeFromInt64(o)
}
}
e.log.Debugf("GetHeadWatermark: %s", debugString.String())
if epoch == math.MaxInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
return processor.Watermark(time.UnixMilli(-1))
}
return processor.Watermark(time.UnixMilli(epoch))
}

// GetPodWatermarks gets the watermarks for all pods of a vertex
func (e *edgeFetcher) GetPodWatermarks() []processor.Watermark {
var debugString strings.Builder
var allProcessors = e.processorManager.GetAllProcessors()
var podWatermarks []processor.Watermark
for _, p := range allProcessors {
if !p.IsActive() {
continue
Expand All @@ -104,12 +78,12 @@ func (e *edgeFetcher) GetPodWatermarks() []processor.Watermark {
}
if epoch == math.MaxInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(-1)))
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1)))
} else {
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(epoch)))
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch)))
}
}
return podWatermarks
return headWatermarks
}

// GetWatermark gets the smallest timestamp for the given offset
Expand Down
5 changes: 3 additions & 2 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"testing"
"time"

"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"

"github.com/numaproj/numaflow/pkg/watermark/store/noop"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestBuffer_GetWatermark(t *testing.T) {
t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.UnixMilli(tt.want)))
}
// this will always be 14 because the timeline has been populated ahead of time
assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.UnixMilli(14).In(location))
assert.Equal(t, time.Time(b.GetHeadWatermarks()[len(b.GetHeadWatermarks())-1]).In(location), time.UnixMilli(14).In(location))
})
}
}
6 changes: 2 additions & 4 deletions pkg/watermark/fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type Fetcher interface {
io.Closer
// GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1.
GetWatermark(offset isb.Offset) processor.Watermark
// GetHeadWatermark returns the latest watermark based on the head offset
GetHeadWatermark() processor.Watermark
// GetPodWatermarks returns the watermarks for all the pods in a vertex
GetPodWatermarks() []processor.Watermark
// GetHeadWatermarks returns the latest watermark based on the head offset
GetHeadWatermarks() []processor.Watermark
}
30 changes: 6 additions & 24 deletions pkg/watermark/fetch/source_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,9 @@ func NewSourceFetcher(ctx context.Context, sourceBufferName string, storeWatcher
}
}

// GetHeadWatermark returns the latest watermark of all the processors.
func (e *sourceFetcher) GetHeadWatermark() processor.Watermark {
var epoch int64 = math.MinInt64
for _, p := range e.processorManager.GetAllProcessors() {
if !p.IsActive() {
continue
}
if p.offsetTimeline.GetHeadWatermark() > epoch {
epoch = p.offsetTimeline.GetHeadWatermark()
}
}
if epoch == math.MinInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
return processor.Watermark(time.UnixMilli(-1))
}
return processor.Watermark(time.UnixMilli(epoch))
}

// GetPodWatermarks returns the list of watermarks of all pods in a vertex
func (e *sourceFetcher) GetPodWatermarks() []processor.Watermark {
var podWatermarks []processor.Watermark
// GetHeadWatermarks returns the latest watermark of all the processors.
func (e *sourceFetcher) GetHeadWatermarks() []processor.Watermark {
var headWatermarks []processor.Watermark
for _, p := range e.processorManager.GetAllProcessors() {
if !p.IsActive() {
continue
Expand All @@ -83,12 +65,12 @@ func (e *sourceFetcher) GetPodWatermarks() []processor.Watermark {
}
if epoch == math.MinInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(-1)))
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1)))
} else {
podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(epoch)))
headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch)))
}
}
return podWatermarks
return headWatermarks
}

// GetWatermark returns the lowest of the latest watermark of all the processors,
Expand Down
Loading

0 comments on commit c9e57d0

Please sign in to comment.