Skip to content

Commit

Permalink
refactor: some refactor on watermark (#149)
Browse files Browse the repository at this point in the history
* refactor: some refactor on watermark

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Sep 8, 2022
1 parent f8f2209 commit be47a26
Show file tree
Hide file tree
Showing 28 changed files with 201 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ data:
otBucket:
maxValueSize: 0
history: 1
ttl: 72h
ttl: 3h
maxBytes: 0
replicas: 3
procBucket:
Expand Down
2 changes: 1 addition & 1 deletion config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12853,7 +12853,7 @@ data:
otBucket:
maxValueSize: 0
history: 1
ttl: 72h
ttl: 3h
maxBytes: 0
replicas: 3
procBucket:
Expand Down
2 changes: 1 addition & 1 deletion config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12772,7 +12772,7 @@ data:
otBucket:
maxValueSize: 0
history: 1
ttl: 72h
ttl: 3h
maxBytes: 0
replicas: 3
procBucket:
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers,
return wmFetcher, nil
}

func createWatermarkFetcher(ctx context.Context, pipelineName string, fromBufferName string, vertexName string) (*generic.GenericFetch, error) {
func createWatermarkFetcher(ctx context.Context, pipelineName string, fromBufferName string, vertexName string) (fetch.Fetcher, error) {
hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, jsclient.NewInClusterJetStreamClient())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// forward the highest watermark to all the edges to avoid idle edge problem
// TODO: sort and get the highest value
if isdf.publishWatermark != nil {
// TODO: Should also publish to those edges without writing (fall out of conditional forwarding)?
for edgeName, offsets := range writeOffsets {
if len(offsets) > 0 {
isdf.publishWatermark[edgeName].PublishWatermark(processorWM, offsets[len(offsets)-1])
Expand Down
8 changes: 6 additions & 2 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package sinks
import (
"context"
"fmt"
"sync"

"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"
"sync"

"go.uber.org/zap"

Expand Down Expand Up @@ -60,7 +61,10 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
readOptions = append(readOptions, jetstreamisb.WithReadTimeOut(x.ReadTimeout.Duration))
}
// build watermark progressors
fetchWatermark, publishWatermark = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)
fetchWatermark, publishWatermark, err = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)
if err != nil {
return err
}

jetStreamClient := jsclient.NewInClusterJetStreamClient()
reader, err = jetstreamisb.NewJetStreamBufferReader(ctx, jetStreamClient, fromBufferName, streamName, streamName, readOptions...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type memgen struct {
}

type watermark struct {
sourcePublish *publish.Publish
sourcePublish publish.Publisher
wmProgressor generic.Progressor
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/sources/generator/tickgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package generator

import (
"context"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"testing"
"time"

"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -32,7 +33,7 @@ func TestRead(t *testing.T) {
"writer": dest,
}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, &publishWMStore)
mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, publishWMStore)
assert.NoError(t, err)
_ = mgen.Start()

Expand Down Expand Up @@ -65,7 +66,7 @@ func TestStop(t *testing.T) {
"writer": dest,
}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, &publishWMStore)
mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, publishWMStore)
assert.NoError(t, err)
stop := mgen.Start()

Expand Down
7 changes: 4 additions & 3 deletions pkg/sources/generator/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package generator

import (
"context"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"os"
"testing"
"time"

"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/simplebuffer"
Expand All @@ -34,7 +35,7 @@ func TestWatermark(t *testing.T) {
Replica: 0,
}
publishWMStore := generic.BuildPublishWMStores(noop.NewKVNoOpStore(), noop.NewKVNoOpStore())
mgen, err := NewMemGen(m, 1, 8, time.Millisecond, []isb.BufferWriter{dest}, nil, nil, &publishWMStore)
mgen, err := NewMemGen(m, 1, 8, time.Millisecond, []isb.BufferWriter{dest}, nil, nil, publishWMStore)
assert.NoError(t, err)
stop := mgen.Start()

Expand Down
11 changes: 8 additions & 3 deletions pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package sources
import (
"context"
"fmt"
"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"
"sync"

"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -40,6 +41,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
// publishWatermark is a map representing a progressor per edge, we are initializing them to a no-op progressor
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(sp.VertexInstance.Vertex.GetToBuffers()))
var publishWMStore = generic.BuildPublishWMStores(noop.NewKVNoOpStore(), noop.NewKVNoOpStore())
var err error

switch sp.ISBSvcType {
case dfv1.ISBSvcTypeRedis:
Expand All @@ -60,7 +62,10 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
case dfv1.ISBSvcTypeJetStream:
// build the right watermark progessor if watermark is enabled
// build watermark progressors
fetchWatermark, publishWatermark, publishWMStore = jetstream.BuildJetStreamWatermarkProgressorsForSource(ctx, sp.VertexInstance)
fetchWatermark, publishWatermark, publishWMStore, err = jetstream.BuildJetStreamWatermarkProgressorsForSource(ctx, sp.VertexInstance)
if err != nil {
return err
}

for _, e := range sp.VertexInstance.Vertex.Spec.ToEdges {
writeOpts := []jetstreamisb.WriteOption{
Expand All @@ -85,7 +90,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
return fmt.Errorf("unrecognized isb svc type %q", sp.ISBSvcType)
}

sourcer, err := sp.getSourcer(writers, fetchWatermark, publishWatermark, &publishWMStore, log)
sourcer, err := sp.getSourcer(writers, fetchWatermark, publishWatermark, publishWMStore, log)
if err != nil {
return fmt.Errorf("failed to find a sourcer, error: %w", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/udf/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ func (u *UDFProcessor) Start(ctx context.Context) error {
}

// build watermark progressors
fetchWatermark, publishWatermark = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)
fetchWatermark, publishWatermark, err = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)
if err != nil {
return err
}

for _, e := range u.VertexInstance.Vertex.Spec.ToEdges {
writeOpts := []jetstreamisb.WriteOption{}
Expand Down
23 changes: 10 additions & 13 deletions pkg/watermark/fetch/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ type Fetcher interface {
type Edge struct {
ctx context.Context
edgeName string
fromVertex *FromVertex
fromVertex FromVertexer
log *zap.SugaredLogger
}

// NewEdgeBuffer returns a new Edge. FromVertex has the details about the processors responsible for writing to this
// edge.
func NewEdgeBuffer(ctx context.Context, edgeName string, fromV *FromVertex) *Edge {
func NewEdgeBuffer(ctx context.Context, edgeName string, fromV FromVertexer) *Edge {
return &Edge{
ctx: ctx,
edgeName: edgeName,
Expand All @@ -51,14 +51,15 @@ func (e *Edge) GetHeadWatermark() processor.Watermark {
var allProcessors = e.fromVertex.GetAllProcessors()
// get the head offset of each processor
for _, p := range allProcessors {
debugString.WriteString(fmt.Sprintf("[HB:%s OT:%s] (headoffset:%d) %s\n", e.fromVertex.hbWatcher.GetKVName(), e.fromVertex.otWatcher.GetKVName(), p.offsetTimeline.GetHeadOffset(), p))
e.log.Debugf("Processor: %v (headoffset:%d)", p, p.offsetTimeline.GetHeadOffset())
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, p.offsetTimeline.GetHeadOffset()))
var o = p.offsetTimeline.GetHeadOffset()
if o != -1 && o > headOffset {
headOffset = o
epoch = p.offsetTimeline.GetEventtimeFromInt64(o)
}
}

e.log.Debugf("GetHeadWatermark: %s", debugString.String())
if epoch == math.MaxInt64 {
return processor.Watermark(time.Time{})
}
Expand All @@ -77,24 +78,20 @@ func (e *Edge) GetWatermark(inputOffset isb.Offset) processor.Watermark {
var epoch int64 = math.MaxInt64
var allProcessors = e.fromVertex.GetAllProcessors()
for _, p := range allProcessors {
debugString.WriteString(fmt.Sprintf("[HB:%s OT:%s] %s\n", e.fromVertex.hbWatcher.GetKVName(), e.fromVertex.otWatcher.GetKVName(), p))
if !p.IsActive() {
continue
}
debugString.WriteString(fmt.Sprintf("[Processor: %v] \n", p))
var t = p.offsetTimeline.GetEventTime(inputOffset)
if t != -1 && t < epoch {
epoch = t
}
// TODO: can we delete an inactive processor?
if p.IsDeleted() && (offset > p.offsetTimeline.GetHeadOffset()) {
// if the pod is not active and the current offset is ahead of all offsets in Timeline
e.fromVertex.DeleteProcessor(p.entity.GetID())
e.fromVertex.heartbeat.Delete(p.entity.GetID())
}
}
// if the offset is smaller than every offset in the timeline, set the value to be -1
if epoch == math.MaxInt64 {
epoch = -1
}
// TODO: use log instead of fmt.Printf
fmt.Printf("\n%s[%s] get watermark for offset %d: %+v\n", debugString.String(), e.edgeName, offset, epoch)
e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.edgeName, offset, epoch)
if epoch == -1 {
return processor.Watermark(time.Time{})
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/watermark/fetch/edge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"

"github.com/numaproj/numaflow/pkg/isb"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/jetstream"
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestBuffer_GetWatermark(t *testing.T) {

hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", publisherHBBucketName, defaultJetStreamClient)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", publisherOTBucketName, defaultJetStreamClient)
testVertex := NewFromVertex(ctx, hbWatcher, otWatcher)
testVertex := NewFromVertex(ctx, hbWatcher, otWatcher).(*fromVertex)
var (
// TODO: watcher should not be nil
testPod0 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, otWatcher)
Expand Down Expand Up @@ -85,16 +86,16 @@ func TestBuffer_GetWatermark(t *testing.T) {
for _, watermark := range pod2Timeline {
testPod2.offsetTimeline.Put(watermark)
}
testVertex.AddProcessor("testPod0", testPod0)
testVertex.AddProcessor("testPod1", testPod1)
testVertex.AddProcessor("testPod2", testPod2)
testVertex.addProcessor("testPod0", testPod0)
testVertex.addProcessor("testPod1", testPod1)
testVertex.addProcessor("testPod2", testPod2)

type args struct {
offset int64
}
tests := []struct {
name string
fromVertex *FromVertex
fromVertex FromVertexer
args args
want int64
}{
Expand Down Expand Up @@ -148,6 +149,7 @@ func TestBuffer_GetWatermark(t *testing.T) {
ctx: ctx,
edgeName: "testBuffer",
fromVertex: tt.fromVertex,
log: zaptest.NewLogger(t).Sugar(),
}
if got := b.GetWatermark(isb.SimpleOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) })); time.Time(got).In(location) != time.Unix(tt.want, 0).In(location) {
t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.Unix(tt.want, 0)))
Expand Down
3 changes: 2 additions & 1 deletion pkg/watermark/fetch/offset_timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
// OffsetTimeline is to store the event time to the offset records.
// Our list is sorted by event time from highest to lowest.
type OffsetTimeline struct {
ctx context.Context
ctx context.Context
// TODO: replace it with OverflowQueue, which is thread safe and 2 times faster.
watermarks list.List
capacity int
lock sync.RWMutex
Expand Down
4 changes: 2 additions & 2 deletions pkg/watermark/fetch/processor_to_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() {
case store.KVPut:
epoch, skip, err := p.entity.ParseOTWatcherKey(value.Key())
if err != nil {
p.log.Errorw("unable to convert value.Key() to int64", zap.String("received", value.Key()), zap.Error(err))
p.log.Errorw("Unable to convert value.Key() to int64", zap.String("received", value.Key()), zap.Error(err))
continue
}
// if skip is set to true, it means the key update we received is for a different processor (sharing of bucket)
Expand All @@ -124,7 +124,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() {
watermark: epoch,
offset: int64(uint64Value),
})
p.log.Debugw("timelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("epoch", epoch), zap.Uint64("value", uint64Value))
p.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("epoch", epoch), zap.Uint64("value", uint64Value))
case store.KVDelete:
// we do not care about Delete events because the timeline bucket is meant to grow and the TTL will
// naturally trim the KV store.
Expand Down
6 changes: 3 additions & 3 deletions pkg/watermark/fetch/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ func RetryUntilSuccessfulWatcherCreation(js *jsclient.JetStreamContext, bucketNa
for i := 0; i < _bucketWatchRetryCount || infiniteLoop; i++ {
bucket, err := js.KeyValue(bucketName)
if err != nil {
log.Errorw("failed to get the bucket by bucket name", zap.String("bucket", bucketName), zap.Error(err))
log.Errorw("Failed to get the bucket by bucket name", zap.String("bucket", bucketName), zap.Error(err))
time.Sleep(_delayInSecBetweenBucketWatchRetry * time.Second)
continue
}
watcher, err := bucket.WatchAll()
if err != nil {
log.Errorw("failed to create the watch all watcher for bucket name", zap.String("bucket", bucketName), zap.Error(err))
log.Errorw("Failed to create the watch all watcher for bucket name", zap.String("bucket", bucketName), zap.Error(err))
time.Sleep(_delayInSecBetweenBucketWatchRetry * time.Second)
continue
}
log.Infow("watcher created for bucket", zap.String("bucket", bucketName))
log.Infow("Watcher created for bucket", zap.String("bucket", bucketName))
return watcher
}
return nil
Expand Down
Loading

0 comments on commit be47a26

Please sign in to comment.