Skip to content

Commit

Permalink
fix: watermark progression during pods creation/deletion (#1619)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored and whynowy committed Apr 1, 2024
1 parent 65a7ece commit f6ed4bb
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 27 deletions.
4 changes: 3 additions & 1 deletion pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func BuildUXEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeli
var fetchers []fetch.HeadFetcher
isReduce := pipeline.GetVertex(edge.To).IsReduceUDF()
partitionCount := pipeline.GetVertex(edge.To).GetPartitionCount()
fromVtxPartitionsCount := pipeline.GetVertex(edge.From).GetPartitionCount()
isFromVertexReduce := pipeline.GetVertex(edge.From).IsReduceUDF()
for i, s := range stores {
fetchers = append(fetchers, fetch.NewEdgeFetcher(ctx, s, partitionCount, fetch.WithIsReduce(isReduce), fetch.WithVertexReplica(int32(i))))
fetchers = append(fetchers, fetch.NewEdgeFetcher(ctx, s, partitionCount, fetch.WithIsReduce(isReduce), fetch.WithVertexReplica(int32(i)), fetch.WithIsFromVtxReduce(isFromVertexReduce), fetch.WithFromVtxPartitions(fromVtxPartitionsCount)))
}
wmFetchers[edge] = fetchers
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,15 @@ func (jr *jetStreamReader) Pending(_ context.Context) (int64, error) {
return jr.client.PendingForStream(jr.stream, jr.stream)
}

func (jr *jetStreamReader) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) {
func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
labels := map[string]string{"buffer": jr.GetName()}
defer func(t time.Time) {
isbReadTime.With(labels).Observe(float64(time.Since(t).Microseconds()))
}(time.Now())
var err error
var result []*isb.ReadMessage
rctx, cancel := context.WithTimeout(ctx, jr.opts.readTimeOut)
defer cancel()
msgs, err := jr.sub.Fetch(int(count), nats.Context(rctx))
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
msgs, err := jr.sub.Fetch(int(count), nats.MaxWait(jr.opts.readTimeOut))
if err != nil && !errors.Is(err, nats.ErrTimeout) {
isbReadErrors.With(map[string]string{"buffer": jr.GetName()}).Inc()
return nil, fmt.Errorf("failed to fetch messages from jet stream subject %q, %w", jr.subject, err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,13 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB
GroupBy: &dfv1.GroupBy{},
},
},
FromEdges: []dfv1.CombinedEdge{
{
Edge: dfv1.Edge{
From: "fromVertex",
},
},
},
},
},
}, map[string]wmstore.WatermarkStore{"fromVertex": store}, fetch.WithIsReduce(true))
Expand Down
21 changes: 20 additions & 1 deletion pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,17 @@ type edgeFetcher struct {
processorManager *processorManager
lastProcessedWm []int64
log *zap.SugaredLogger
opts *options
sync.RWMutex
}

// NewEdgeFetcher returns a new edge fetcher. This could have been private, except that UI uses it.
func NewEdgeFetcher(ctx context.Context, wmStore store.WatermarkStore, fromBufferPartitionCount int, opts ...Option) *edgeFetcher {
dOpts := defaultOptions()
for _, opt := range opts {
opt(dOpts)
}

log := logging.FromContext(ctx)
log.Info("Creating a new edge watermark fetcher")
var lastProcessedWm []int64
Expand All @@ -63,6 +69,7 @@ func NewEdgeFetcher(ctx context.Context, wmStore store.WatermarkStore, fromBuffe
processorManager: manager,
lastProcessedWm: lastProcessedWm,
log: log,
opts: dOpts,
}
}

Expand All @@ -77,6 +84,14 @@ func (e *edgeFetcher) updateWatermark(inputOffset isb.Offset, fromPartitionIdx i
var debugString strings.Builder
var epoch int64 = math.MaxInt64
var allProcessors = e.processorManager.getAllProcessors()

// Avoid computing the watermark if the vertex type is reduce until all pods are up and running.
// The reason is each pod reads from a unique ISB buffer and reduce has a persistent state.
// Not considering all the processors might result in incorrect computation of the watermark.
if e.opts.isFromVtxReduce && e.opts.fromVtxPartitions != len(allProcessors) {
return wmb.InitialWatermark
}

for _, p := range allProcessors {
// headOffset is used to check whether this pod can be deleted.
headOffset := int64(-1)
Expand All @@ -101,7 +116,11 @@ func (e *edgeFetcher) updateWatermark(inputOffset isb.Offset, fromPartitionIdx i

// if the pod is not active and the head offset of all the timelines is less than the input offset, delete the processor
// (this means we are processing data later than what the stale processor has processed)
if p.IsDeleted() && (offset > headOffset) {

// If fromVertex type is reduce, don't delete the processor, reduce needs all processors for correct watermark calculation
// since each pod reads from different buffer had has persistent state. Deleting a processor might give wrong watermark values.
// Although we have check to return -1, if the number of processors doesn't match fromVertex partitions, it's better not to delete.
if p.IsDeleted() && (offset > headOffset) && !e.opts.isFromVtxReduce {
e.log.Infow("Deleting processor because it's stale", zap.String("processor", p.GetEntity().GetName()))
e.processorManager.deleteProcessor(p.GetEntity().GetName())
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/watermark/fetch/edge_fetcher_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ type edgeFetcherSet struct {
// NewEdgeFetcherSet creates a new edgeFetcherSet object which implements the Fetcher interface.
func NewEdgeFetcherSet(ctx context.Context, vertexInstance *dfv1.VertexInstance, wmStores map[string]store.WatermarkStore, opts ...Option) Fetcher {
var edgeFetchers = make(map[string]*edgeFetcher)
for key, wmStore := range wmStores {
var fetchWatermark *edgeFetcher
// create a fetcher that fetches watermark.
if vertexInstance.Vertex.IsASource() {
// panic: source vertex is handled using new source fetcher
panic("NewEdgeFetcherSet can't create a new edge fetcher set for a source vertex.")
} else if vertexInstance.Vertex.IsReduceUDF() {
fetchWatermark = NewEdgeFetcher(ctx, wmStore, 1, opts...)
for _, e := range vertexInstance.Vertex.Spec.FromEdges {
var fromBufferPartitionCount int
if vertexInstance.Vertex.IsReduceUDF() {
fromBufferPartitionCount = 1
} else {
fetchWatermark = NewEdgeFetcher(ctx, wmStore, vertexInstance.Vertex.Spec.GetPartitionCount(), opts...)
fromBufferPartitionCount = vertexInstance.Vertex.Spec.GetPartitionCount()
}
edgeFetchers[key] = fetchWatermark

opts = append(opts, WithFromVtxPartitions(e.GetFromVertexPartitions()))
if e.FromVertexType == dfv1.VertexTypeReduceUDF {
opts = append(opts, WithIsFromVtxReduce(true))
}

edgeFetchers[e.From] = NewEdgeFetcher(ctx, wmStores[e.From], fromBufferPartitionCount, opts...)
}
return &edgeFetcherSet{
edgeFetchers,
Expand Down
11 changes: 11 additions & 0 deletions pkg/watermark/fetch/edge_fetcher_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func Test_EdgeFetcherSet_ComputeWatermark(t *testing.T) {
processorManager: processorManagers[vertex],
log: zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)).Sugar(),
lastProcessedWm: tt.lastProcessedWm[vertex],
opts: defaultOptions(),
}
}
if got := efs.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.offset, 10) }), tt.partitionIdx); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) {
Expand Down Expand Up @@ -404,6 +405,7 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) {
processorManager: edge1ProcessorManagerNonIdle,
lastProcessedWm: []int64{16, 18},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
},
wmb.WMB{},
Expand All @@ -415,6 +417,7 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) {
processorManager: edge1ProcessorManagerIdle,
lastProcessedWm: []int64{16, 18},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
},
wmb.WMB{
Expand All @@ -431,11 +434,13 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) {
processorManager: edge1ProcessorManagerNonIdle,
lastProcessedWm: []int64{16, 18},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
"edge2_non_idle": {
processorManager: edge2ProcessorManagerNonIdle,
lastProcessedWm: []int64{17, 17},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
},
wmb.WMB{},
Expand All @@ -447,11 +452,13 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) {
processorManager: edge1ProcessorManagerIdle,
lastProcessedWm: []int64{16, 18},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
"edge2_non_idle": {
processorManager: edge2ProcessorManagerNonIdle,
lastProcessedWm: []int64{17, 17},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
},
wmb.WMB{},
Expand All @@ -463,11 +470,13 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) {
processorManager: edge1ProcessorManagerIdle,
lastProcessedWm: []int64{19, 18},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
"edge2_idle": {
processorManager: edge2ProcessorManagerIdle,
lastProcessedWm: []int64{15, 18},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
},
wmb.WMB{
Expand All @@ -484,11 +493,13 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) {
processorManager: edge1ProcessorManagerIdle,
lastProcessedWm: []int64{13, 15},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
"edge2_idle": {
processorManager: edge2ProcessorManagerIdle,
lastProcessedWm: []int64{14, 12},
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
},
},
wmb.WMB{},
Expand Down
4 changes: 4 additions & 0 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func TestBuffer_updateWatermarkWithOnePartition(t *testing.T) {
processorManager: tt.processorManager,
log: zaptest.NewLogger(t).Sugar(),
lastProcessedWm: lastProcessed,
opts: defaultOptions(),
}
if got := b.updateWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) }), 0); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) {
t.Errorf("ComputeWatermark() = %v, want %v", got, wmb.Watermark(time.UnixMilli(tt.want)))
Expand Down Expand Up @@ -313,6 +314,7 @@ func TestBuffer_updateWatermarkWithMultiplePartition(t *testing.T) {
processorManager: tt.pm,
log: zaptest.NewLogger(t).Sugar(),
lastProcessedWm: tt.lastProcessedWm,
opts: defaultOptions(),
}
_ = b.updateWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) }), tt.partitionIdx)
if got := b.getWatermark(); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) {
Expand Down Expand Up @@ -359,6 +361,7 @@ func Test_edgeFetcher_ComputeHeadWatermark(t *testing.T) {
e := &edgeFetcher{
processorManager: tt.processorManager,
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
}
assert.Equalf(t, tt.want, e.ComputeHeadWatermark(0).UnixMilli(), "ComputeHeadWatermark()")
})
Expand Down Expand Up @@ -551,6 +554,7 @@ func Test_edgeFetcher_updateHeadIdleWMB(t *testing.T) {
processorManager: tt.processorManager,
lastProcessedWm: lastProcessedWm,
log: zaptest.NewLogger(t).Sugar(),
opts: defaultOptions(),
}
assert.Equalf(t, tt.want, e.updateHeadIdleWMB(0), "updateHeadIdleWMB()")
})
Expand Down
27 changes: 27 additions & 0 deletions pkg/watermark/fetch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,24 @@ type options struct {
vertexReplica int32
// isSource is true if the vertex is source
isSource bool
// isFromVtxReduce is true if the from vertex is reduce
isFromVtxReduce bool
// fromVtxPartitions is the number of partitions in from vertex.
fromVtxPartitions int
}

// Option set options for FromVertex.
type Option func(options *options)

func defaultOptions() *options {
return &options{
podHeartbeatRate: 5,
refreshingProcessorsRate: 5,
vertexReplica: 0,
fromVtxPartitions: 1,
}
}

// WithPodHeartbeatRate sets the heartbeat rate in seconds.
func WithPodHeartbeatRate(rate int64) Option {
return func(opts *options) {
Expand Down Expand Up @@ -67,3 +80,17 @@ func WithIsSource(isSource bool) Option {
opts.isSource = isSource
}
}

// WithIsFromVtxReduce to indicate if the fromVertex is reduce
func WithIsFromVtxReduce(isFromVtxReduce bool) Option {
return func(opts *options) {
opts.isFromVtxReduce = isFromVtxReduce
}
}

// WithFromVtxPartitions to indicate the number of partitions in fromVertex
func WithFromVtxPartitions(partitions int) Option {
return func(opts *options) {
opts.fromVtxPartitions = partitions
}
}
8 changes: 1 addition & 7 deletions pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ type processorManager struct {

// newProcessorManager returns a new processorManager instance
func newProcessorManager(ctx context.Context, wmStore store.WatermarkStore, fromBufferPartitionCount int32, inputOpts ...Option) *processorManager {
opts := &options{
podHeartbeatRate: 5,
refreshingProcessorsRate: 5,
isReduce: false,
isSource: false,
vertexReplica: 0,
}
opts := defaultOptions()

for _, opt := range inputOpts {
opt(opts)
Expand Down
6 changes: 6 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (s *FunctionalSuite) TestUDFFiltering() {
}

func (s *FunctionalSuite) TestConditionalForwarding() {

// FIXME: flaky when redis is used as isb
if strings.ToUpper(os.Getenv("ISBSVC")) == "REDIS" {
s.T().SkipNow()
}

w := s.Given().Pipeline("@testdata/even-odd.yaml").
When().
CreatePipelineAndWait()
Expand Down
4 changes: 1 addition & 3 deletions test/reduce-one-e2e/reduce_one_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -283,7 +281,7 @@ func (r *ReduceSuite) TestComplexSlidingWindowPipeline() {
// we only have to extend the timeout for the first output to be produced. for the rest,
// we just need to wait for the default timeout for the rest of the outputs since its synchronous
w.Expect().
SinkContains("sink", "30", SinkCheckWithTimeout(300*time.Second)).
SinkContains("sink", "30").
SinkContains("sink", "60").
SinkNotContains("sink", "80").
SinkContains("sink", "90").
Expand Down

0 comments on commit f6ed4bb

Please sign in to comment.