diff --git a/config/base/controller-manager/numaflow-controller-config.yaml b/config/base/controller-manager/numaflow-controller-config.yaml index 54c950ce76..54d6a2338b 100644 --- a/config/base/controller-manager/numaflow-controller-config.yaml +++ b/config/base/controller-manager/numaflow-controller-config.yaml @@ -69,7 +69,7 @@ data: otBucket: maxValueSize: 0 history: 1 - ttl: 72h + ttl: 3h maxBytes: 0 replicas: 3 procBucket: diff --git a/config/install.yaml b/config/install.yaml index c0eb6a3dc5..f7b9f40b85 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -12853,7 +12853,7 @@ data: otBucket: maxValueSize: 0 history: 1 - ttl: 72h + ttl: 3h maxBytes: 0 replicas: 3 procBucket: diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml index d7e3803886..8266cafcd4 100644 --- a/config/namespace-install.yaml +++ b/config/namespace-install.yaml @@ -12772,7 +12772,7 @@ data: otBucket: maxValueSize: 0 history: 1 - ttl: 72h + ttl: 3h maxBytes: 0 replicas: 3 procBucket: diff --git a/pkg/daemon/server/service/pipeline_watermark_query.go b/pkg/daemon/server/service/pipeline_watermark_query.go index da7bdfd9c7..538996f936 100644 --- a/pkg/daemon/server/service/pipeline_watermark_query.go +++ b/pkg/daemon/server/service/pipeline_watermark_query.go @@ -72,7 +72,7 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers, return wmFetcher, nil } -func createWatermarkFetcher(ctx context.Context, pipelineName string, fromBufferName string, vertexName string) (*generic.GenericFetch, error) { +func createWatermarkFetcher(ctx context.Context, pipelineName string, fromBufferName string, vertexName string) (fetch.Fetcher, error) { hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName) hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { diff --git a/pkg/isb/forward/forward.go b/pkg/isb/forward/forward.go index dac9f6b47f..eac1df5be6 100644 --- a/pkg/isb/forward/forward.go +++ b/pkg/isb/forward/forward.go @@ -251,6 +251,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // forward the highest watermark to all the edges to avoid idle edge problem // TODO: sort and get the highest value if isdf.publishWatermark != nil { + // TODO: Should also publish to those edges without writing (fall out of conditional forwarding)? for edgeName, offsets := range writeOffsets { if len(offsets) > 0 { isdf.publishWatermark[edgeName].PublishWatermark(processorWM, offsets[len(offsets)-1]) diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index 769080a24c..b2ad65fc6d 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -3,9 +3,10 @@ package sinks import ( "context" "fmt" + "sync" + "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/generic/jetstream" - "sync" "go.uber.org/zap" @@ -60,7 +61,10 @@ func (u *SinkProcessor) Start(ctx context.Context) error { readOptions = append(readOptions, jetstreamisb.WithReadTimeOut(x.ReadTimeout.Duration)) } // build watermark progressors - fetchWatermark, publishWatermark = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance) + fetchWatermark, publishWatermark, err = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance) + if err != nil { + return err + } jetStreamClient := jsclient.NewInClusterJetStreamClient() reader, err = jetstreamisb.NewJetStreamBufferReader(ctx, jetStreamClient, fromBufferName, streamName, streamName, readOptions...) diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index e202acb940..b1d5d773a1 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -91,7 +91,7 @@ type memgen struct { } type watermark struct { - sourcePublish *publish.Publish + sourcePublish publish.Publisher wmProgressor generic.Progressor } diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index 43846fbb45..44633682a2 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -2,11 +2,12 @@ package generator import ( "context" - "github.com/numaproj/numaflow/pkg/watermark/generic" - "github.com/numaproj/numaflow/pkg/watermark/store/noop" "testing" "time" + "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/store/noop" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,7 +33,7 @@ func TestRead(t *testing.T) { "writer": dest, } fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, &publishWMStore) + mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, publishWMStore) assert.NoError(t, err) _ = mgen.Start() @@ -65,7 +66,7 @@ func TestStop(t *testing.T) { "writer": dest, } fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, &publishWMStore) + mgen, err := NewMemGen(m, 5, 8, time.Millisecond, []isb.BufferWriter{dest}, fetchWatermark, publishWatermark, publishWMStore) assert.NoError(t, err) stop := mgen.Start() diff --git a/pkg/sources/generator/watermark_test.go b/pkg/sources/generator/watermark_test.go index 762b53f8da..7f57fe5cce 100644 --- a/pkg/sources/generator/watermark_test.go +++ b/pkg/sources/generator/watermark_test.go @@ -2,12 +2,13 @@ package generator import ( "context" - "github.com/numaproj/numaflow/pkg/watermark/generic" - "github.com/numaproj/numaflow/pkg/watermark/store/noop" "os" "testing" "time" + "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/store/noop" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/simplebuffer" @@ -34,7 +35,7 @@ func TestWatermark(t *testing.T) { Replica: 0, } publishWMStore := generic.BuildPublishWMStores(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) - mgen, err := NewMemGen(m, 1, 8, time.Millisecond, []isb.BufferWriter{dest}, nil, nil, &publishWMStore) + mgen, err := NewMemGen(m, 1, 8, time.Millisecond, []isb.BufferWriter{dest}, nil, nil, publishWMStore) assert.NoError(t, err) stop := mgen.Start() diff --git a/pkg/sources/source.go b/pkg/sources/source.go index a5cb8260b9..e275a6a9f6 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -3,9 +3,10 @@ package sources import ( "context" "fmt" - "github.com/numaproj/numaflow/pkg/watermark/generic/jetstream" "sync" + "github.com/numaproj/numaflow/pkg/watermark/generic/jetstream" + "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -40,6 +41,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { // publishWatermark is a map representing a progressor per edge, we are initializing them to a no-op progressor fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(sp.VertexInstance.Vertex.GetToBuffers())) var publishWMStore = generic.BuildPublishWMStores(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + var err error switch sp.ISBSvcType { case dfv1.ISBSvcTypeRedis: @@ -60,7 +62,10 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { case dfv1.ISBSvcTypeJetStream: // build the right watermark progessor if watermark is enabled // build watermark progressors - fetchWatermark, publishWatermark, publishWMStore = jetstream.BuildJetStreamWatermarkProgressorsForSource(ctx, sp.VertexInstance) + fetchWatermark, publishWatermark, publishWMStore, err = jetstream.BuildJetStreamWatermarkProgressorsForSource(ctx, sp.VertexInstance) + if err != nil { + return err + } for _, e := range sp.VertexInstance.Vertex.Spec.ToEdges { writeOpts := []jetstreamisb.WriteOption{ @@ -85,7 +90,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { return fmt.Errorf("unrecognized isb svc type %q", sp.ISBSvcType) } - sourcer, err := sp.getSourcer(writers, fetchWatermark, publishWatermark, &publishWMStore, log) + sourcer, err := sp.getSourcer(writers, fetchWatermark, publishWatermark, publishWMStore, log) if err != nil { return fmt.Errorf("failed to find a sourcer, error: %w", err) } diff --git a/pkg/udf/udf.go b/pkg/udf/udf.go index 2d62c8bd10..0df8c8240e 100644 --- a/pkg/udf/udf.go +++ b/pkg/udf/udf.go @@ -82,7 +82,10 @@ func (u *UDFProcessor) Start(ctx context.Context) error { } // build watermark progressors - fetchWatermark, publishWatermark = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance) + fetchWatermark, publishWatermark, err = jetstream.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance) + if err != nil { + return err + } for _, e := range u.VertexInstance.Vertex.Spec.ToEdges { writeOpts := []jetstreamisb.WriteOption{} diff --git a/pkg/watermark/fetch/edge.go b/pkg/watermark/fetch/edge.go index b1c219c1dd..b893a5d2ca 100644 --- a/pkg/watermark/fetch/edge.go +++ b/pkg/watermark/fetch/edge.go @@ -26,13 +26,13 @@ type Fetcher interface { type Edge struct { ctx context.Context edgeName string - fromVertex *FromVertex + fromVertex FromVertexer log *zap.SugaredLogger } // NewEdgeBuffer returns a new Edge. FromVertex has the details about the processors responsible for writing to this // edge. -func NewEdgeBuffer(ctx context.Context, edgeName string, fromV *FromVertex) *Edge { +func NewEdgeBuffer(ctx context.Context, edgeName string, fromV FromVertexer) *Edge { return &Edge{ ctx: ctx, edgeName: edgeName, @@ -51,14 +51,15 @@ func (e *Edge) GetHeadWatermark() processor.Watermark { var allProcessors = e.fromVertex.GetAllProcessors() // get the head offset of each processor for _, p := range allProcessors { - debugString.WriteString(fmt.Sprintf("[HB:%s OT:%s] (headoffset:%d) %s\n", e.fromVertex.hbWatcher.GetKVName(), e.fromVertex.otWatcher.GetKVName(), p.offsetTimeline.GetHeadOffset(), p)) + e.log.Debugf("Processor: %v (headoffset:%d)", p, p.offsetTimeline.GetHeadOffset()) + debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, p.offsetTimeline.GetHeadOffset())) var o = p.offsetTimeline.GetHeadOffset() if o != -1 && o > headOffset { headOffset = o epoch = p.offsetTimeline.GetEventtimeFromInt64(o) } } - + e.log.Debugf("GetHeadWatermark: %s", debugString.String()) if epoch == math.MaxInt64 { return processor.Watermark(time.Time{}) } @@ -77,24 +78,20 @@ func (e *Edge) GetWatermark(inputOffset isb.Offset) processor.Watermark { var epoch int64 = math.MaxInt64 var allProcessors = e.fromVertex.GetAllProcessors() for _, p := range allProcessors { - debugString.WriteString(fmt.Sprintf("[HB:%s OT:%s] %s\n", e.fromVertex.hbWatcher.GetKVName(), e.fromVertex.otWatcher.GetKVName(), p)) + if !p.IsActive() { + continue + } + debugString.WriteString(fmt.Sprintf("[Processor: %v] \n", p)) var t = p.offsetTimeline.GetEventTime(inputOffset) if t != -1 && t < epoch { epoch = t } - // TODO: can we delete an inactive processor? - if p.IsDeleted() && (offset > p.offsetTimeline.GetHeadOffset()) { - // if the pod is not active and the current offset is ahead of all offsets in Timeline - e.fromVertex.DeleteProcessor(p.entity.GetID()) - e.fromVertex.heartbeat.Delete(p.entity.GetID()) - } } // if the offset is smaller than every offset in the timeline, set the value to be -1 if epoch == math.MaxInt64 { epoch = -1 } - // TODO: use log instead of fmt.Printf - fmt.Printf("\n%s[%s] get watermark for offset %d: %+v\n", debugString.String(), e.edgeName, offset, epoch) + e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.edgeName, offset, epoch) if epoch == -1 { return processor.Watermark(time.Time{}) } diff --git a/pkg/watermark/fetch/edge_test.go b/pkg/watermark/fetch/edge_test.go index f05e5b37a9..ad7b4b826c 100644 --- a/pkg/watermark/fetch/edge_test.go +++ b/pkg/watermark/fetch/edge_test.go @@ -10,6 +10,7 @@ import ( "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" "github.com/numaproj/numaflow/pkg/isb" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/jetstream" @@ -49,7 +50,7 @@ func TestBuffer_GetWatermark(t *testing.T) { hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", publisherHBBucketName, defaultJetStreamClient) otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", publisherOTBucketName, defaultJetStreamClient) - testVertex := NewFromVertex(ctx, hbWatcher, otWatcher) + testVertex := NewFromVertex(ctx, hbWatcher, otWatcher).(*fromVertex) var ( // TODO: watcher should not be nil testPod0 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, otWatcher) @@ -85,16 +86,16 @@ func TestBuffer_GetWatermark(t *testing.T) { for _, watermark := range pod2Timeline { testPod2.offsetTimeline.Put(watermark) } - testVertex.AddProcessor("testPod0", testPod0) - testVertex.AddProcessor("testPod1", testPod1) - testVertex.AddProcessor("testPod2", testPod2) + testVertex.addProcessor("testPod0", testPod0) + testVertex.addProcessor("testPod1", testPod1) + testVertex.addProcessor("testPod2", testPod2) type args struct { offset int64 } tests := []struct { name string - fromVertex *FromVertex + fromVertex FromVertexer args args want int64 }{ @@ -148,6 +149,7 @@ func TestBuffer_GetWatermark(t *testing.T) { ctx: ctx, edgeName: "testBuffer", fromVertex: tt.fromVertex, + log: zaptest.NewLogger(t).Sugar(), } if got := b.GetWatermark(isb.SimpleOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) })); time.Time(got).In(location) != time.Unix(tt.want, 0).In(location) { t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.Unix(tt.want, 0))) diff --git a/pkg/watermark/fetch/offset_timeline.go b/pkg/watermark/fetch/offset_timeline.go index 0e6f4bc23a..2d7339ca7d 100644 --- a/pkg/watermark/fetch/offset_timeline.go +++ b/pkg/watermark/fetch/offset_timeline.go @@ -16,7 +16,8 @@ import ( // OffsetTimeline is to store the event time to the offset records. // Our list is sorted by event time from highest to lowest. type OffsetTimeline struct { - ctx context.Context + ctx context.Context + // TODO: replace it with OverflowQueue, which is thread safe and 2 times faster. watermarks list.List capacity int lock sync.RWMutex diff --git a/pkg/watermark/fetch/processor_to_fetch.go b/pkg/watermark/fetch/processor_to_fetch.go index bbfc54e322..2a2f62bf0a 100644 --- a/pkg/watermark/fetch/processor_to_fetch.go +++ b/pkg/watermark/fetch/processor_to_fetch.go @@ -112,7 +112,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() { case store.KVPut: epoch, skip, err := p.entity.ParseOTWatcherKey(value.Key()) if err != nil { - p.log.Errorw("unable to convert value.Key() to int64", zap.String("received", value.Key()), zap.Error(err)) + p.log.Errorw("Unable to convert value.Key() to int64", zap.String("received", value.Key()), zap.Error(err)) continue } // if skip is set to true, it means the key update we received is for a different processor (sharing of bucket) @@ -124,7 +124,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() { watermark: epoch, offset: int64(uint64Value), }) - p.log.Debugw("timelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("epoch", epoch), zap.Uint64("value", uint64Value)) + p.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("epoch", epoch), zap.Uint64("value", uint64Value)) case store.KVDelete: // we do not care about Delete events because the timeline bucket is meant to grow and the TTL will // naturally trim the KV store. diff --git a/pkg/watermark/fetch/utils.go b/pkg/watermark/fetch/utils.go index 009fa2bb6a..55c46e4c94 100644 --- a/pkg/watermark/fetch/utils.go +++ b/pkg/watermark/fetch/utils.go @@ -21,17 +21,17 @@ func RetryUntilSuccessfulWatcherCreation(js *jsclient.JetStreamContext, bucketNa for i := 0; i < _bucketWatchRetryCount || infiniteLoop; i++ { bucket, err := js.KeyValue(bucketName) if err != nil { - log.Errorw("failed to get the bucket by bucket name", zap.String("bucket", bucketName), zap.Error(err)) + log.Errorw("Failed to get the bucket by bucket name", zap.String("bucket", bucketName), zap.Error(err)) time.Sleep(_delayInSecBetweenBucketWatchRetry * time.Second) continue } watcher, err := bucket.WatchAll() if err != nil { - log.Errorw("failed to create the watch all watcher for bucket name", zap.String("bucket", bucketName), zap.Error(err)) + log.Errorw("Failed to create the watch all watcher for bucket name", zap.String("bucket", bucketName), zap.Error(err)) time.Sleep(_delayInSecBetweenBucketWatchRetry * time.Second) continue } - log.Infow("watcher created for bucket", zap.String("bucket", bucketName)) + log.Infow("Watcher created for bucket", zap.String("bucket", bucketName)) return watcher } return nil diff --git a/pkg/watermark/fetch/vertex.go b/pkg/watermark/fetch/vertex.go index b6e556b481..d1128279e2 100644 --- a/pkg/watermark/fetch/vertex.go +++ b/pkg/watermark/fetch/vertex.go @@ -20,12 +20,16 @@ type FromVertexer interface { // GetAllProcessors fetches all the processors from Vn-1 vertex. processors could be pods or when the vertex is a // source vertex, it could be partitions if the source is Kafka. GetAllProcessors() map[string]*ProcessorToFetch + // GetProcessor gets a processor. + GetProcessor(processor string) *ProcessorToFetch + // DeleteProcessor deletes a processor. + DeleteProcessor(processor string) } -// FromVertex is the point of view of Vn-1 from Vn vertex. The code is running on Vn vertex. +// fromVertex is the point of view of Vn-1 from Vn vertex. The code is running on Vn vertex. // It has the mapping of all the processors which in turn has all the information about each processor // timelines. -type FromVertex struct { +type fromVertex struct { ctx context.Context hbWatcher store.WatermarkKVWatcher otWatcher store.WatermarkKVWatcher @@ -39,7 +43,7 @@ type FromVertex struct { } // NewFromVertex returns `FromVertex` -func NewFromVertex(ctx context.Context, hbWatcher store.WatermarkKVWatcher, otWatcher store.WatermarkKVWatcher, inputOpts ...VertexOption) *FromVertex { +func NewFromVertex(ctx context.Context, hbWatcher store.WatermarkKVWatcher, otWatcher store.WatermarkKVWatcher, inputOpts ...VertexOption) FromVertexer { opts := &vertexOptions{ podHeartbeatRate: 5, refreshingProcessorsRate: 5, @@ -49,7 +53,7 @@ func NewFromVertex(ctx context.Context, hbWatcher store.WatermarkKVWatcher, otWa opt(opts) } - v := &FromVertex{ + v := &fromVertex{ ctx: ctx, hbWatcher: hbWatcher, otWatcher: otWatcher, @@ -66,15 +70,15 @@ func NewFromVertex(ctx context.Context, hbWatcher store.WatermarkKVWatcher, otWa return v } -// AddProcessor adds a new processor. -func (v *FromVertex) AddProcessor(processor string, p *ProcessorToFetch) { +// addProcessor adds a new processor. +func (v *fromVertex) addProcessor(processor string, p *ProcessorToFetch) { v.lock.Lock() defer v.lock.Unlock() v.processors[processor] = p } // GetProcessor gets a processor. -func (v *FromVertex) GetProcessor(processor string) *ProcessorToFetch { +func (v *fromVertex) GetProcessor(processor string) *ProcessorToFetch { v.lock.RLock() defer v.lock.RUnlock() if p, ok := v.processors[processor]; ok { @@ -84,7 +88,7 @@ func (v *FromVertex) GetProcessor(processor string) *ProcessorToFetch { } // DeleteProcessor deletes a processor. -func (v *FromVertex) DeleteProcessor(processor string) { +func (v *fromVertex) DeleteProcessor(processor string) { v.lock.Lock() defer v.lock.Unlock() // we do not require this processor's reference anymore @@ -92,7 +96,7 @@ func (v *FromVertex) DeleteProcessor(processor string) { } // GetAllProcessors returns all the processors. -func (v *FromVertex) GetAllProcessors() map[string]*ProcessorToFetch { +func (v *fromVertex) GetAllProcessors() map[string]*ProcessorToFetch { v.lock.RLock() defer v.lock.RUnlock() var processors = make(map[string]*ProcessorToFetch, len(v.processors)) @@ -102,7 +106,7 @@ func (v *FromVertex) GetAllProcessors() map[string]*ProcessorToFetch { return processors } -func (v *FromVertex) startRefreshingProcessors() { +func (v *fromVertex) startRefreshingProcessors() { ticker := time.NewTicker(time.Duration(v.opts.refreshingProcessorsRate) * time.Second) defer ticker.Stop() v.log.Infow("Refreshing ActiveProcessors ticker started") @@ -117,7 +121,7 @@ func (v *FromVertex) startRefreshingProcessors() { } // refreshingProcessors keeps the v.ActivePods to be a map of live ActivePods -func (v *FromVertex) refreshingProcessors() { +func (v *fromVertex) refreshingProcessors() { var debugStr strings.Builder for pName, pTime := range v.heartbeat.GetAll() { p := v.GetProcessor(pName) @@ -137,12 +141,12 @@ func (v *FromVertex) refreshingProcessors() { debugStr.WriteString(fmt.Sprintf("[%s] ", pName)) } } - v.log.Debugw("active processors", zap.String("HB", v.hbWatcher.GetKVName()), zap.String("OT", v.otWatcher.GetKVName()), zap.String("DebugStr", debugStr.String())) + v.log.Debugw("Active processors", zap.String("HB", v.hbWatcher.GetKVName()), zap.String("OT", v.otWatcher.GetKVName()), zap.String("DebugStr", debugStr.String())) } // startHeatBeatWatcher starts the processor Heartbeat Watcher to listen to the processor bucket and update the processor // Heartbeat map. In the processor heartbeat bucket we have the structure key: processor-name, value: processor-heartbeat. -func (v *FromVertex) startHeatBeatWatcher() { +func (v *fromVertex) startHeatBeatWatcher() { watchCh := v.hbWatcher.Watch(v.ctx) for { select { @@ -162,7 +166,7 @@ func (v *FromVertex) startHeatBeatWatcher() { // The fromProcessor may have been deleted // TODO: make capacity configurable var fromProcessor = NewProcessorToFetch(v.ctx, entity, 10, v.otWatcher) - v.AddProcessor(value.Key(), fromProcessor) + v.addProcessor(value.Key(), fromProcessor) v.log.Infow("v.AddProcessor successfully added a new fromProcessor", zap.String("fromProcessor", value.Key())) } else { // else just make a note that this processor is still active p.setStatus(_active) @@ -170,23 +174,24 @@ func (v *FromVertex) startHeatBeatWatcher() { // value is epoch intValue, convErr := strconv.Atoi(string(value.Value())) if convErr != nil { - v.log.Errorw("unable to convert intValue.Value() to int64", zap.Error(convErr)) + v.log.Errorw("Unable to convert intValue.Value() to int64", zap.Error(convErr)) + } else { + // insert the last seen timestamp. we use this to figure whether this processor entity is inactive. + v.heartbeat.Put(value.Key(), int64(intValue)) } - // insert the last seen timestamp. we use this to figure whether this processor entity is inactive. - v.heartbeat.Put(value.Key(), int64(intValue)) case store.KVDelete: p := v.GetProcessor(value.Key()) if p == nil { - v.log.Errorw("nil pointer for the processor, perhaps already deleted", zap.String("key", value.Key())) + v.log.Infow("Nil pointer for the processor, perhaps already deleted", zap.String("key", value.Key())) } else if p.IsDeleted() { - v.log.Warnw("already deleted", zap.String("key", value.Key()), zap.String(value.Key(), p.String())) + v.log.Warnw("Already deleted", zap.String("key", value.Key()), zap.String(value.Key(), p.String())) } else { - v.log.Infow("deleting", zap.String("key", value.Key()), zap.String(value.Key(), p.String())) + v.log.Infow("Deleting", zap.String("key", value.Key()), zap.String(value.Key(), p.String())) p.setStatus(_deleted) v.heartbeat.Delete(value.Key()) } case store.KVPurge: - v.log.Errorw("received nats.KeyValuePurge", zap.String("bucket", v.hbWatcher.GetKVName())) + v.log.Warnw("Received nats.KeyValuePurge", zap.String("bucket", v.hbWatcher.GetKVName())) } v.log.Debugw("processorHeartbeatWatcher - Updates:", zap.String("Operation", value.Operation().String()), zap.String("HB", v.hbWatcher.GetKVName()), zap.String("key", value.Key()), zap.String("value", string(value.Value()))) diff --git a/pkg/watermark/fetch/vertex_test.go b/pkg/watermark/fetch/vertex_test.go index ce2ff17635..4fcfa86d05 100644 --- a/pkg/watermark/fetch/vertex_test.go +++ b/pkg/watermark/fetch/vertex_test.go @@ -73,7 +73,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) { hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) - var testVertex = NewFromVertex(ctx, hbWatcher, otWatcher, WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(false)) + var testVertex = NewFromVertex(ctx, hbWatcher, otWatcher, WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(false)).(*fromVertex) var testBuffer = NewEdgeBuffer(ctx, "testBuffer", testVertex) go func() { @@ -138,8 +138,9 @@ func TestFetcherWithSameOTBucket(t *testing.T) { // because "p1" offsetTimeline's head offset=100, which is < inputOffset 103 _ = testBuffer.GetWatermark(isb.SimpleOffset(func() string { return strconv.FormatInt(testOffset+3, 10) })) allProcessors = testBuffer.fromVertex.GetAllProcessors() - assert.Equal(t, 1, len(allProcessors)) + assert.Equal(t, 2, len(allProcessors)) assert.True(t, allProcessors["p2"].IsActive()) + assert.False(t, allProcessors["p1"].IsActive()) time.Sleep(time.Second) // resume after one second @@ -153,6 +154,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) { }() // wait until p1 becomes active + time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second) allProcessors = testBuffer.fromVertex.GetAllProcessors() for len(allProcessors) != 2 { select { @@ -306,7 +308,7 @@ func TestFetcherWithSeparateOTBucket(t *testing.T) { hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) - var testVertex = NewFromVertex(ctx, hbWatcher, otWatcher, WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(true)) + var testVertex = NewFromVertex(ctx, hbWatcher, otWatcher, WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(true)).(*fromVertex) var testBuffer = NewEdgeBuffer(ctx, "testBuffer", testVertex) //var location, _ = time.LoadLocation("UTC") diff --git a/pkg/watermark/generic/fetcher.go b/pkg/watermark/generic/fetcher.go index c97133cc3b..016e67e7d5 100644 --- a/pkg/watermark/generic/fetcher.go +++ b/pkg/watermark/generic/fetcher.go @@ -9,12 +9,12 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/store" ) -// GenericFetch is a generic fetcher which can be used for most use cases. -type GenericFetch struct { +// genericFetch is a generic fetcher which can be used for most use cases. +type genericFetch struct { fromEdge *fetch.Edge } -var _ fetch.Fetcher = (*GenericFetch)(nil) +var _ fetch.Fetcher = (*genericFetch)(nil) // FetchWMWatchers has the watcher information required for fetching watermarks. type FetchWMWatchers struct { @@ -33,11 +33,11 @@ func BuildFetchWMWatchers(hbWatch store.WatermarkKVWatcher, otWatch store.Waterm // NewGenericFetch returns GenericFetch. vertexName is the vertex currently processing. // fetchWM is a struct for retrieving both the heartbeat // and the offset watermark timeline (Vn-1 vertex). -func NewGenericFetch(ctx context.Context, vertexName string, fetchWM FetchWMWatchers) *GenericFetch { +func NewGenericFetch(ctx context.Context, vertexName string, fetchWM FetchWMWatchers) fetch.Fetcher { fromVertex := fetch.NewFromVertex(ctx, fetchWM.HBWatch, fetchWM.OTWatch) fromEdge := fetch.NewEdgeBuffer(ctx, vertexName, fromVertex) - gf := &GenericFetch{ + gf := &genericFetch{ fromEdge: fromEdge, } @@ -45,11 +45,11 @@ func NewGenericFetch(ctx context.Context, vertexName string, fetchWM FetchWMWatc } // GetWatermark returns the watermark for the offset. -func (g *GenericFetch) GetWatermark(offset isb.Offset) processor.Watermark { +func (g *genericFetch) GetWatermark(offset isb.Offset) processor.Watermark { return g.fromEdge.GetWatermark(offset) } // GetHeadWatermark returns the head watermark based on the head offset. -func (g *GenericFetch) GetHeadWatermark() processor.Watermark { +func (g *genericFetch) GetHeadWatermark() processor.Watermark { return g.fromEdge.GetHeadWatermark() } diff --git a/pkg/watermark/generic/jetstream/generic.go b/pkg/watermark/generic/jetstream/generic.go index fe28a242df..2679f291d7 100644 --- a/pkg/watermark/generic/jetstream/generic.go +++ b/pkg/watermark/generic/jetstream/generic.go @@ -5,13 +5,12 @@ package jetstream import ( "context" "fmt" + "github.com/numaproj/numaflow/pkg/watermark/generic" - "go.uber.org/zap" "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isbsvc" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/jetstream" - "github.com/numaproj/numaflow/pkg/shared/logging" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" @@ -24,32 +23,31 @@ import ( // Fetcher has one-to-one relationship , whereas we have multiple publishers as the vertex can read only from one edge, // and it can write to many. // The function is used only when watermarking is enabled on the pipeline. -func BuildJetStreamWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.VertexInstance) (fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher) { +func BuildJetStreamWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.VertexInstance) (fetch.Fetcher, map[string]publish.Publisher, error) { // if watermark is not enabled, use no-op. if !sharedutil.IsWatermarkEnabled() { - fetchWatermark, publishWatermark = generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(vertexInstance.Vertex.GetToBuffers())) - return fetchWatermark, publishWatermark + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(vertexInstance.Vertex.GetToBuffers())) + return fetchWatermark, publishWatermark, nil } - log := logging.FromContext(ctx) - publishWatermark = make(map[string]publish.Publisher) + publishWatermark := make(map[string]publish.Publisher) // Fetcher creation pipelineName := vertexInstance.Vertex.Spec.PipelineName fromBufferName := vertexInstance.Vertex.GetFromBuffers()[0].Name hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName) hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { - log.Fatalw("JetStreamKVWatch failed", zap.String("HeartbeatBucket", hbBucket), zap.Error(err)) + return nil, nil, fmt.Errorf("failed at new HB KVJetStreamKVWatch, HeartbeatBucket: %s, %w", hbBucket, err) } otBucket := isbsvc.JetStreamOTBucket(pipelineName, fromBufferName) otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { - log.Fatalw("JetStreamKVWatch failed", zap.String("OTBucket", otBucket), zap.Error(err)) + return nil, nil, fmt.Errorf("failed at new OT KVJetStreamKVWatch, OTBucket: %s, %w", otBucket, err) } var fetchWmWatchers = generic.BuildFetchWMWatchers(hbWatch, otWatch) - fetchWatermark = generic.NewGenericFetch(ctx, vertexInstance.Vertex.Name, fetchWmWatchers) + fetchWatermark := generic.NewGenericFetch(ctx, vertexInstance.Vertex.Name, fetchWmWatchers) // Publisher map creation, we need a publisher per edge. @@ -59,36 +57,43 @@ func BuildJetStreamWatermarkProgressors(ctx context.Context, vertexInstance *v1a // vertex level. We are creating a new one for the time being because controller creates a pair of buckets per edge. hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbPublisherBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { - log.Fatalw("JetStreamKVStore failed", zap.String("HeartbeatPublisherBucket", hbPublisherBucket), zap.Error(err)) + return nil, nil, fmt.Errorf("failed at new HB Publish JetStreamKVStore, HeartbeatPublisherBucket: %s, %w", hbPublisherBucket, err) } otStoreBucket := isbsvc.JetStreamOTBucket(pipelineName, buffer.Name) otStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otStoreBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { - log.Fatalw("JetStreamKVStore failed", zap.String("OTBucket", otStoreBucket), zap.Error(err)) + return nil, nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, OTBucket: %s, %w", otStoreBucket, err) } var publishStores = generic.BuildPublishWMStores(hbStore, otStore) var processorName = fmt.Sprintf("%s-%d", vertexInstance.Vertex.Name, vertexInstance.Replica) - publishWatermark[buffer.Name] = generic.NewGenericPublish(ctx, processorName, publishStores) + publishWatermark[buffer.Name] = generic.NewGenericPublish(ctx, processorName, *publishStores) } - return fetchWatermark, publishWatermark + return fetchWatermark, publishWatermark, nil } // BuildJetStreamWatermarkProgressorsForSource is an extension of BuildJetStreamWatermarkProgressors to also return the publish stores. This is // for letting source implement as many publishers that it requires to progress the watermark monotonically for each individual processing entity. // Eg, watermark progresses independently and monotonically for each partition in a Kafka topic. -func BuildJetStreamWatermarkProgressorsForSource(ctx context.Context, vertexInstance *v1alpha1.VertexInstance) (fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, publishWM generic.PublishWMStores) { - fetchWatermark, publishWatermark = BuildJetStreamWatermarkProgressors(ctx, vertexInstance) - - // return no-ops if not enabled! +func BuildJetStreamWatermarkProgressorsForSource(ctx context.Context, vertexInstance *v1alpha1.VertexInstance) (fetch.Fetcher, map[string]publish.Publisher, *generic.PublishWMStores, error) { + if vertexInstance == nil || vertexInstance.Vertex == nil { + return nil, nil, nil, fmt.Errorf("nil vertex instance") + } + if !vertexInstance.Vertex.IsASource() { + return nil, nil, nil, fmt.Errorf("invalid vertex: not a source") + } + fetchWatermark, publishWatermark, err := BuildJetStreamWatermarkProgressors(ctx, vertexInstance) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to build JetStreamWatermarkProgressors, %w", err) + } + // return no-ops if not enabled!f if !sharedutil.IsWatermarkEnabled() { - publishWM = generic.BuildPublishWMStores(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) - return fetchWatermark, publishWatermark, publishWM + publishWM := generic.BuildPublishWMStores(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + return fetchWatermark, publishWatermark, publishWM, nil } - log := logging.FromContext(ctx) pipelineName := vertexInstance.Vertex.Spec.PipelineName sourceBufferName := vertexInstance.Vertex.GetFromBuffers()[0].Name @@ -96,17 +101,17 @@ func BuildJetStreamWatermarkProgressorsForSource(ctx context.Context, vertexInst hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, sourceBufferName) hbKVStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { - log.Fatalw("JetStreamKVStore failed", zap.String("HeartbeatBucket", hbBucket), zap.Error(err)) + return nil, nil, nil, fmt.Errorf("failed at new HB KVJetStreamKVStore for source, HeartbeatBucket: %s, %w", hbBucket, err) } // OT otStoreBucket := isbsvc.JetStreamOTBucket(pipelineName, sourceBufferName) otKVStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otStoreBucket, jsclient.NewInClusterJetStreamClient()) if err != nil { - log.Fatalw("JetStreamKVStore failed", zap.String("OTBucket", otStoreBucket), zap.Error(err)) + return nil, nil, nil, fmt.Errorf("failed at new OT KVJetStreamKVStore for source, OTBucket: %s, %w", otStoreBucket, err) } // interface for publisher store (HB and OT) - publishWM = generic.BuildPublishWMStores(hbKVStore, otKVStore) - return fetchWatermark, publishWatermark, publishWM + publishWM := generic.BuildPublishWMStores(hbKVStore, otKVStore) + return fetchWatermark, publishWatermark, publishWM, nil } diff --git a/pkg/watermark/generic/publisher.go b/pkg/watermark/generic/publisher.go index c3f381ad03..1d6bbe5a8b 100644 --- a/pkg/watermark/generic/publisher.go +++ b/pkg/watermark/generic/publisher.go @@ -16,43 +16,43 @@ type PublishWMStores struct { } // BuildPublishWMStores builds the PublishWMStores. -func BuildPublishWMStores(hbStore store.WatermarkKVStorer, otStore store.WatermarkKVStorer) PublishWMStores { - return PublishWMStores{ +func BuildPublishWMStores(hbStore store.WatermarkKVStorer, otStore store.WatermarkKVStorer) *PublishWMStores { + return &PublishWMStores{ HBStore: hbStore, OTStore: otStore, } } -// GenericPublish is a generic publisher which will work for most cases. -type GenericPublish struct { - toEdge *publish.Publish +// genericPublish is a generic publisher which will work for most cases. +type genericPublish struct { + toEdge publish.Publisher } -var _ publish.Publisher = (*GenericPublish)(nil) +var _ publish.Publisher = (*genericPublish)(nil) // NewGenericPublish returns GenericPublish. processorName is the unique processor (pod) that is running on this vertex. // publishKeyspace is obsolete, and will be removed in subsequent iterations. publishWM is a struct for storing both the heartbeat // and the offset watermark timeline stores for the Vn vertex. -func NewGenericPublish(ctx context.Context, processorName string, publishWM PublishWMStores) *GenericPublish { +func NewGenericPublish(ctx context.Context, processorName string, publishWM PublishWMStores) publish.Publisher { publishEntity := processor.NewProcessorEntity(processorName) udfPublish := publish.NewPublish(ctx, publishEntity, publishWM.HBStore, publishWM.OTStore) - gp := &GenericPublish{ + gp := &genericPublish{ toEdge: udfPublish, } return gp } // PublishWatermark publishes for the generic publisher. -func (g *GenericPublish) PublishWatermark(watermark processor.Watermark, offset isb.Offset) { +func (g *genericPublish) PublishWatermark(watermark processor.Watermark, offset isb.Offset) { g.toEdge.PublishWatermark(watermark, offset) } // GetLatestWatermark gets the latest watermakr for the generic publisher. -func (g *GenericPublish) GetLatestWatermark() processor.Watermark { +func (g *genericPublish) GetLatestWatermark() processor.Watermark { return g.toEdge.GetLatestWatermark() } // StopPublisher stops the generic publisher. -func (g *GenericPublish) StopPublisher() { +func (g *genericPublish) StopPublisher() { g.toEdge.StopPublisher() } diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 6674fdf271..8fc8980dab 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -26,8 +26,8 @@ type Publisher interface { StopPublisher() } -// Publish publishes the watermark for a processor entity. -type Publish struct { +// publish publishes the watermark for a processor entity. +type publish struct { ctx context.Context entity processor.ProcessorEntitier heartbeatStore store.WatermarkKVStorer @@ -42,7 +42,7 @@ type Publish struct { } // NewPublish returns `Publish`. -func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier, hbStore store.WatermarkKVStorer, otStore store.WatermarkKVStorer, inputOpts ...PublishOption) *Publish { +func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier, hbStore store.WatermarkKVStorer, otStore store.WatermarkKVStorer, inputOpts ...PublishOption) Publisher { log := logging.FromContext(ctx) @@ -54,7 +54,7 @@ func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier opt(opts) } - p := &Publish{ + p := &publish{ ctx: ctx, entity: processorEntity, heartbeatStore: hbStore, @@ -75,18 +75,18 @@ func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier } // initialSetup inserts the default values as the ProcessorEntity starts emitting watermarks. -func (p *Publish) initialSetup() { +func (p *publish) initialSetup() { p.headWatermark = p.loadLatestFromStore() } // PublishWatermark publishes watermark and will retry until it can succeed. It will not publish if the new-watermark // is less than the current head watermark. -func (p *Publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { +func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { // update p.headWatermark only if wm > p.headWatermark if time.Time(wm).After(time.Time(p.headWatermark)) { p.headWatermark = wm } else { - p.log.Errorw("new watermark is older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + p.log.Errorw("New watermark is older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) return } @@ -101,7 +101,7 @@ func (p *Publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { for { err := p.otStore.PutKV(p.ctx, key, value) if err != nil { - p.log.Errorw("unable to publish watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + p.log.Errorw("Unable to publish watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) // TODO: better exponential backoff time.Sleep(time.Millisecond * 250) } else { @@ -112,7 +112,8 @@ func (p *Publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { // loadLatestFromStore loads the latest watermark stored in the watermark store. // TODO: how to repopulate if the processing unit is down for a really long time? -func (p *Publish) loadLatestFromStore() processor.Watermark { +func (p *publish) loadLatestFromStore() processor.Watermark { + // TODO: this is too much. var watermarks = p.getAllOTKeysFromBucket() var latestWatermark int64 = math.MinInt64 @@ -124,7 +125,8 @@ func (p *Publish) loadLatestFromStore() processor.Watermark { continue } if err != nil { - p.log.Panicw("invalid epoch time string", zap.Error(err)) + p.log.Errorw("Invalid epoch time string", zap.Error(err)) + continue } if latestWatermark < epoch { latestWatermark = epoch @@ -135,11 +137,11 @@ func (p *Publish) loadLatestFromStore() processor.Watermark { } // GetLatestWatermark returns the latest watermark for that processor. -func (p *Publish) GetLatestWatermark() processor.Watermark { +func (p *publish) GetLatestWatermark() processor.Watermark { return p.headWatermark } -func (p *Publish) publishHeartbeat() { +func (p *publish) publishHeartbeat() { ticker := time.NewTicker(time.Second * time.Duration(p.podHeartbeatRate)) defer ticker.Stop() p.log.Infow("Refreshing ActiveProcessors ticker started") @@ -150,34 +152,34 @@ func (p *Publish) publishHeartbeat() { case <-ticker.C: err := p.heartbeatStore.PutKV(p.ctx, p.entity.GetID(), []byte(fmt.Sprintf("%d", time.Now().Unix()))) if err != nil { - p.log.Errorw("put to bucket failed", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.Error(err)) + p.log.Errorw("Put to bucket failed", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.Error(err)) } } } } // StopPublisher stops the publisher and cleans up the data associated with key. -func (p *Publish) StopPublisher() { +func (p *publish) StopPublisher() { // TODO: cleanup after processor dies // - delete the Offset-Timeline bucket // - remove itself from heartbeat bucket - p.log.Infow("stopping publisher", zap.String("bucket", p.heartbeatStore.GetStoreName())) + p.log.Infow("Stopping publisher", zap.String("bucket", p.heartbeatStore.GetStoreName())) if !p.entity.IsOTBucketShared() { - p.log.Errorw("non sharing of bucket is not supported by controller as of today", zap.String("bucket", p.heartbeatStore.GetStoreName())) + p.log.Warnw("Non sharing of bucket is not supported by controller as of today", zap.String("bucket", p.heartbeatStore.GetStoreName())) } // clean up heartbeat bucket err := p.heartbeatStore.DeleteKey(p.ctx, p.entity.GetID()) if err != nil { - p.log.Errorw("failed to delete the key in the heartbeat bucket", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.String("key", p.entity.GetID()), zap.Error(err)) + p.log.Errorw("Failed to delete the key in the heartbeat bucket", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.String("key", p.entity.GetID()), zap.Error(err)) } } -func (p *Publish) getAllOTKeysFromBucket() []string { +func (p *publish) getAllOTKeysFromBucket() []string { keys, err := p.otStore.GetAllKeys(p.ctx) if err != nil && !errors.Is(err, nats.ErrNoKeysFound) { - p.log.Fatalw("failed to get the keys", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.Error(err)) + p.log.Fatalw("Failed to get the keys", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.Error(err)) } return keys } diff --git a/pkg/watermark/publish/publisher_test.go b/pkg/watermark/publish/publisher_test.go index 150bb57cee..ab848d6cfb 100644 --- a/pkg/watermark/publish/publisher_test.go +++ b/pkg/watermark/publish/publisher_test.go @@ -56,7 +56,7 @@ func TestPublisherWithSeparateOTBuckets(t *testing.T) { publishEntity := processor.NewProcessorEntity("publisherTestPod1") - p := NewPublish(ctx, publishEntity, heartbeatKV, otKV, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)) + p := NewPublish(ctx, publishEntity, heartbeatKV, otKV, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) var epoch int64 = 1651161600 var location, _ = time.LoadLocation("UTC") @@ -108,7 +108,7 @@ func TestPublisherWithSharedOTBucket(t *testing.T) { otKV, err := jetstream.NewKVJetStreamKVStore(ctx, "testPublisher", keyspace+"_OT", defaultJetStreamClient) assert.NoError(t, err) - p := NewPublish(ctx, publishEntity, heartbeatKV, otKV, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)) + p := NewPublish(ctx, publishEntity, heartbeatKV, otKV, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) var epoch int64 = 1651161600 var location, _ = time.LoadLocation("UTC") diff --git a/pkg/watermark/store/interface.go b/pkg/watermark/store/interface.go index f259488dd3..5fc0ed2584 100644 --- a/pkg/watermark/store/interface.go +++ b/pkg/watermark/store/interface.go @@ -10,7 +10,7 @@ type WMStorer interface { WatermarkKVWatcher } -// WatermarkKVStorer is defines the storage for publishing the watermark. +// WatermarkKVStorer defines the storage for publishing the watermark. type WatermarkKVStorer interface { // GetAllKeys the keys from KV store. GetAllKeys(context.Context) ([]string, error) diff --git a/pkg/watermark/store/jetstream/kv_store.go b/pkg/watermark/store/jetstream/kv_store.go index 3896e4f76a..21689534b6 100644 --- a/pkg/watermark/store/jetstream/kv_store.go +++ b/pkg/watermark/store/jetstream/kv_store.go @@ -15,8 +15,8 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/store" ) -// KVJetStreamStore implements the watermark's KV store backed up by Jetstream. -type KVJetStreamStore struct { +// jetStreamStore implements the watermark's KV store backed up by Jetstream. +type jetStreamStore struct { pipelineName string conn *jsclient.NatsConn kv nats.KeyValue @@ -24,10 +24,10 @@ type KVJetStreamStore struct { log *zap.SugaredLogger } -var _ store.WatermarkKVStorer = (*KVJetStreamStore)(nil) +var _ store.WatermarkKVStorer = (*jetStreamStore)(nil) // NewKVJetStreamKVStore returns KVJetStreamStore. -func NewKVJetStreamKVStore(ctx context.Context, pipelineName string, bucketName string, client jsclient.JetStreamClient, opts ...JSKVStoreOption) (*KVJetStreamStore, error) { +func NewKVJetStreamKVStore(ctx context.Context, pipelineName string, bucketName string, client jsclient.JetStreamClient, opts ...JSKVStoreOption) (store.WatermarkKVStorer, error) { var err error conn, err := client.Connect(ctx) if err != nil { @@ -41,7 +41,7 @@ func NewKVJetStreamKVStore(ctx context.Context, pipelineName string, bucketName return nil, fmt.Errorf("failed to get JetStream context for writer") } - j := &KVJetStreamStore{ + j := &jetStreamStore{ pipelineName: pipelineName, conn: conn, js: js, @@ -62,10 +62,10 @@ func NewKVJetStreamKVStore(ctx context.Context, pipelineName string, bucketName } // JSKVStoreOption is to pass in Jetstream options. -type JSKVStoreOption func(*KVJetStreamStore) error +type JSKVStoreOption func(*jetStreamStore) error // GetAllKeys returns all the keys in the key-value store. -func (kv *KVJetStreamStore) GetAllKeys(_ context.Context) ([]string, error) { +func (kv *jetStreamStore) GetAllKeys(_ context.Context) ([]string, error) { keys, err := kv.kv.Keys() if err != nil { return nil, err @@ -74,7 +74,7 @@ func (kv *KVJetStreamStore) GetAllKeys(_ context.Context) ([]string, error) { } // GetValue returns the value for a given key. -func (kv *KVJetStreamStore) GetValue(_ context.Context, k string) ([]byte, error) { +func (kv *jetStreamStore) GetValue(_ context.Context, k string) ([]byte, error) { kvEntry, err := kv.kv.Get(k) if err != nil { return []byte(""), err @@ -85,23 +85,23 @@ func (kv *KVJetStreamStore) GetValue(_ context.Context, k string) ([]byte, error } // GetStoreName returns the store name. -func (kv *KVJetStreamStore) GetStoreName() string { +func (kv *jetStreamStore) GetStoreName() string { return kv.kv.Bucket() } // DeleteKey deletes the key from the JS key-value store. -func (kv *KVJetStreamStore) DeleteKey(_ context.Context, k string) error { +func (kv *jetStreamStore) DeleteKey(_ context.Context, k string) error { return kv.kv.Delete(k) } // PutKV puts an element to the JS key-value store. -func (kv *KVJetStreamStore) PutKV(_ context.Context, k string, v []byte) error { +func (kv *jetStreamStore) PutKV(_ context.Context, k string, v []byte) error { _, err := kv.kv.Put(k, v) return err } // Close closes the jetstream connection. -func (kv *KVJetStreamStore) Close() { +func (kv *jetStreamStore) Close() { if !kv.conn.IsClosed() { kv.conn.Close() } diff --git a/pkg/watermark/store/jetstream/kv_watch.go b/pkg/watermark/store/jetstream/kv_watch.go index 1b348bd956..85c666564b 100644 --- a/pkg/watermark/store/jetstream/kv_watch.go +++ b/pkg/watermark/store/jetstream/kv_watch.go @@ -13,8 +13,8 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/store" ) -// KVJetStreamWatch implements the watermark's KV store backed up by Jetstream. -type KVJetStreamWatch struct { +// jetStreamWatch implements the watermark's KV store backed up by Jetstream. +type jetStreamWatch struct { pipelineName string conn *jsclient.NatsConn kv nats.KeyValue @@ -22,10 +22,10 @@ type KVJetStreamWatch struct { log *zap.SugaredLogger } -var _ store.WatermarkKVWatcher = (*KVJetStreamWatch)(nil) +var _ store.WatermarkKVWatcher = (*jetStreamWatch)(nil) // NewKVJetStreamKVWatch returns KVJetStreamWatch specific to Jetsteam which implements the WatermarkKVWatcher interface. -func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketName string, client jsclient.JetStreamClient, opts ...JSKVWatcherOption) (*KVJetStreamWatch, error) { +func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketName string, client jsclient.JetStreamClient, opts ...JSKVWatcherOption) (store.WatermarkKVWatcher, error) { var err error conn, err := client.Connect(ctx) if err != nil { @@ -39,7 +39,7 @@ func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketNam return nil, fmt.Errorf("failed to get JetStream context for writer") } - j := &KVJetStreamWatch{ + j := &jetStreamWatch{ pipelineName: pipelineName, conn: conn, js: js, @@ -63,7 +63,7 @@ func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketNam } // JSKVWatcherOption is to pass in Jetstream options. -type JSKVWatcherOption func(*KVJetStreamWatch) error +type JSKVWatcherOption func(*jetStreamWatch) error // kvEntry is each key-value entry in the store and the operation associated with the kv pair. type kvEntry struct { @@ -88,7 +88,7 @@ func (k kvEntry) Operation() store.KVWatchOp { } // Watch watches the key-value store (aka bucket). -func (k *KVJetStreamWatch) Watch(ctx context.Context) <-chan store.WatermarkKVEntry { +func (k *jetStreamWatch) Watch(ctx context.Context) <-chan store.WatermarkKVEntry { kvWatcher, err := k.kv.WatchAll() for err != nil { k.log.Errorw("WatchAll failed", zap.String("watcher", k.GetKVName()), zap.Error(err)) @@ -134,12 +134,12 @@ func (k *KVJetStreamWatch) Watch(ctx context.Context) <-chan store.WatermarkKVEn } // GetKVName returns the KV store (bucket) name. -func (k *KVJetStreamWatch) GetKVName() string { +func (k *jetStreamWatch) GetKVName() string { return k.kv.Bucket() } // Close closes the connection. -func (k *KVJetStreamWatch) Close() { +func (k *jetStreamWatch) Close() { if !k.conn.IsClosed() { k.conn.Close() } diff --git a/pkg/watermark/store/noop/kv_store.go b/pkg/watermark/store/noop/kv_store.go index dbd4a1b595..b31befc21f 100644 --- a/pkg/watermark/store/noop/kv_store.go +++ b/pkg/watermark/store/noop/kv_store.go @@ -8,36 +8,36 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/store" ) -// KVNoOpStore is a no-op store which does not do any operation but can be safely invoked. -type KVNoOpStore struct { +// noOpStore is a no-op store which does not do any operation but can be safely invoked. +type noOpStore struct { } -var _ store.WatermarkKVStorer = (*KVNoOpStore)(nil) +var _ store.WatermarkKVStorer = (*noOpStore)(nil) -// NewKVNoOpStore returns KVNoOpStore. -func NewKVNoOpStore() *KVNoOpStore { - return &KVNoOpStore{} +// NewKVNoOpStore returns a no-op WatermarkKVStorer. +func NewKVNoOpStore() store.WatermarkKVStorer { + return &noOpStore{} } -func (K KVNoOpStore) GetAllKeys(_ context.Context) ([]string, error) { +func (k noOpStore) GetAllKeys(_ context.Context) ([]string, error) { return []string{}, nil } -func (K KVNoOpStore) DeleteKey(_ context.Context, _ string) error { +func (k noOpStore) DeleteKey(_ context.Context, _ string) error { return nil } -func (K KVNoOpStore) PutKV(_ context.Context, _ string, _ []byte) error { +func (k noOpStore) PutKV(_ context.Context, _ string, _ []byte) error { return nil } -func (K KVNoOpStore) GetValue(_ context.Context, _ string) ([]byte, error) { +func (k noOpStore) GetValue(_ context.Context, _ string) ([]byte, error) { return []byte{}, nil } -func (K KVNoOpStore) GetStoreName() string { +func (k noOpStore) GetStoreName() string { return "noop" } -func (K KVNoOpStore) Close() { +func (k noOpStore) Close() { } diff --git a/pkg/watermark/store/noop/kv_watch.go b/pkg/watermark/store/noop/kv_watch.go index 32360c7ca4..e71b7bb2a0 100644 --- a/pkg/watermark/store/noop/kv_watch.go +++ b/pkg/watermark/store/noop/kv_watch.go @@ -2,28 +2,29 @@ package noop import ( "context" + "github.com/numaproj/numaflow/pkg/watermark/store" ) -type KVNoOpWatch struct { +type noOpWatch struct { } -var _ store.WatermarkKVWatcher = (*KVNoOpWatch)(nil) +var _ store.WatermarkKVWatcher = (*noOpWatch)(nil) -func NewKVOpWatch() *KVNoOpWatch { - return &KVNoOpWatch{} +func NewKVOpWatch() store.WatermarkKVWatcher { + return &noOpWatch{} } // Watch returns a blocking channel. -func (K KVNoOpWatch) Watch(ctx context.Context) <-chan store.WatermarkKVEntry { +func (no noOpWatch) Watch(ctx context.Context) <-chan store.WatermarkKVEntry { retChan := make(chan store.WatermarkKVEntry) return retChan } -func (K KVNoOpWatch) GetKVName() string { +func (no noOpWatch) GetKVName() string { return "" } // Close closes, but we do not close the channel created during watch here; that should be taken care of by the context done -func (K KVNoOpWatch) Close() { +func (no noOpWatch) Close() { }