diff --git a/pkg/daemon/server/service/pipeline_watermark_query.go b/pkg/daemon/server/service/pipeline_watermark_query.go index 9a75fb7c79..b69cdd219e 100644 --- a/pkg/daemon/server/service/pipeline_watermark_query.go +++ b/pkg/daemon/server/service/pipeline_watermark_query.go @@ -75,7 +75,7 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request // If watermark is not enabled, return time zero if ps.pipeline.Spec.Watermark.Disabled { - timeZero := time.Unix(0, 0).Unix() + timeZero := time.Unix(0, 0).UnixMilli() v := &daemon.VertexWatermark{ Pipeline: &ps.pipeline.Name, Vertex: request.Vertex, @@ -91,13 +91,13 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request // Vertex not found if !ok { - log.Errorf("watermark fetchers not available for vertex %s in the fetcher map", vertexName) + log.Errorf("Watermark fetchers not available for vertex %s in the fetcher map", vertexName) return nil, fmt.Errorf("watermark not available for given vertex, %s", vertexName) } var latestWatermark = int64(-1) for _, fetcher := range vertexFetchers { - watermark := fetcher.GetHeadWatermark().Unix() + watermark := fetcher.GetHeadWatermark().UnixMilli() if watermark > latestWatermark { latestWatermark = watermark } diff --git a/pkg/isb/stores/jetstream/reader.go b/pkg/isb/stores/jetstream/reader.go index 029eb1c62d..bf326b9e71 100644 --- a/pkg/isb/stores/jetstream/reader.go +++ b/pkg/isb/stores/jetstream/reader.go @@ -150,7 +150,7 @@ func (jr *jetStreamReader) runAckInfomationChecker(ctx context.Context) { checkAckInfo := func() { c, err := js.ConsumerInfo(jr.stream, jr.stream) if err != nil { - jr.log.Errorw("failed to get consumer info in the reader", zap.Error(err)) + jr.log.Errorw("Failed to get consumer info in the reader", zap.Error(err)) return } ts := timestampedSequence{seq: int64(c.AckFloor.Stream), timestamp: time.Now().Unix()} diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index 17e54e0685..963a345ba2 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -93,7 +93,7 @@ func (jw *jetStreamWriter) runStatusChecker(ctx context.Context) { c, err := js.ConsumerInfo(jw.stream, jw.stream) if err != nil { isbFullErrors.With(labels).Inc() - jw.log.Errorw("failed to get consumer info in the writer", zap.Error(err)) + jw.log.Errorw("Failed to get consumer info in the writer", zap.Error(err)) return } var solidUsage, softUsage float64 diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c4168068f0..ff17f5357a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -138,7 +138,7 @@ func (ms *metricsServer) buildupPendingInfo(ctx context.Context) { return case <-ticker.C: if pending, err := ms.lagReader.Pending(ctx); err != nil { - log.Errorw("failed to get pending messages", zap.Error(err)) + log.Errorw("Failed to get pending messages", zap.Error(err)) } else { if pending != isb.PendingNotAvailable { ts := timestampedPending{pending: pending, timestamp: time.Now().Unix()} @@ -167,7 +167,7 @@ func (ms *metricsServer) exposePendingAndRate(ctx context.Context) { if ms.rater != nil { for n, i := range lookbackSecondsMap { if r, err := ms.rater.Rate(ctx, i); err != nil { - log.Errorw("failed to get processing rate in the past seconds", zap.Int64("seconds", i), zap.Error(err)) + log.Errorw("Failed to get processing rate in the past seconds", zap.Int64("seconds", i), zap.Error(err)) } else { if r != isb.RateNotAvailable { processingRate.WithLabelValues(ms.vertex.Spec.PipelineName, ms.vertex.Spec.Name, n).Set(r) diff --git a/pkg/pbq/pbq.go b/pkg/pbq/pbq.go index 575406072b..62b4264bd3 100644 --- a/pkg/pbq/pbq.go +++ b/pkg/pbq/pbq.go @@ -30,7 +30,7 @@ var _ ReadWriteCloser = (*PBQ)(nil) func (p *PBQ) Write(ctx context.Context, message *isb.ReadMessage) error { // if cob we should return if p.cob { - p.log.Errorw("failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header)) + p.log.Errorw("Failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header)) return ErrCOB } var writeErr error @@ -84,7 +84,7 @@ readLoop: for { readMessages, eof, err := p.store.Read(size) if err != nil { - p.log.Errorw("error while replaying records from store", zap.Any("ID", p.PartitionID), zap.Error(err)) + p.log.Errorw("Error while replaying records from store", zap.Any("ID", p.PartitionID), zap.Error(err)) } for _, msg := range readMessages { // select to avoid infinite blocking while writing to output channel diff --git a/pkg/reconciler/isbsvc/controller.go b/pkg/reconciler/isbsvc/controller.go index 77afff314e..dd02be803f 100644 --- a/pkg/reconciler/isbsvc/controller.go +++ b/pkg/reconciler/isbsvc/controller.go @@ -46,7 +46,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct isbsCopy := isbs.DeepCopy() reconcileErr := r.reconcile(ctx, isbsCopy) if reconcileErr != nil { - log.Errorw("reconcile error", zap.Error(reconcileErr)) + log.Errorw("Reconcile error", zap.Error(reconcileErr)) } if r.needsUpdate(isbs, isbsCopy) { if err := r.client.Update(ctx, isbsCopy); err != nil { diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 5615df1515..cbd818936f 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -92,7 +92,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) ( if !safeToDelete { log.Info("Pipeline deletion is waiting to finish the unconsumed messages") - //Requeue request to process after 10s + // Requeue request to process after 10s return ctrl.Result{RequeueAfter: dfv1.DefaultRequeueAfter}, nil } } @@ -370,7 +370,7 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli if apierrors.IsNotFound(err) { // somehow it doesn't need to clean up return nil } - log.Errorw("failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err)) + log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err)) return err } @@ -626,7 +626,7 @@ func (r *pipelineReconciler) safeToDelete(ctx context.Context, pl *dfv1.Pipeline if err != nil { return false, err } - //Requeue pipeline to take effect the vertex replica changes + // Requeue pipeline to take effect the vertex replica changes if vertexPatched { return false, nil } diff --git a/pkg/reduce/pnf/processandforward_test.go b/pkg/reduce/pnf/processandforward_test.go index 99eb8ac445..3e60aeae9d 100644 --- a/pkg/reduce/pnf/processandforward_test.go +++ b/pkg/reduce/pnf/processandforward_test.go @@ -67,8 +67,8 @@ func TestProcessAndForward_Process(t *testing.T) { size := 100 testPartition := partition.ID{ - Start: time.Unix(60, 0), - End: time.Unix(120, 0), + Start: time.UnixMilli(60000), + End: time.UnixMilli(120000), Key: "partition-1", } var err error @@ -168,38 +168,38 @@ func TestProcessAndForward_Forward(t *testing.T) { { name: "test-forward-one", id: partition.ID{ - Start: time.Unix(60, 0), - End: time.Unix(120, 0), + Start: time.UnixMilli(60000), + End: time.UnixMilli(120000), Key: "test-forward-one", }, buffers: []*simplebuffer.InMemoryBuffer{test1Buffer1, test1Buffer2}, pf: createProcessAndForward(ctx, "test-forward-one", pbqManager, toBuffers1), expected: []bool{false, true}, wmExpected: map[string]int64{ - "buffer1": 120, + "buffer1": 120000, "buffer2": math.MinInt64, }, }, { name: "test-forward-all", id: partition.ID{ - Start: time.Unix(60, 0), - End: time.Unix(120, 0), + Start: time.UnixMilli(60000), + End: time.UnixMilli(120000), Key: "test-forward-all", }, buffers: []*simplebuffer.InMemoryBuffer{test2Buffer1, test2Buffer2}, pf: createProcessAndForward(ctx, "test-forward-all", pbqManager, toBuffers2), expected: []bool{false, false}, wmExpected: map[string]int64{ - "buffer1": 120, - "buffer2": 120, + "buffer1": 120000, + "buffer2": 120000, }, }, { name: "test-drop-all", id: partition.ID{ - Start: time.Unix(60, 0), - End: time.Unix(120, 0), + Start: time.UnixMilli(60000), + End: time.UnixMilli(120000), Key: "test-drop-all", }, buffers: []*simplebuffer.InMemoryBuffer{test3Buffer1, test3Buffer2}, @@ -222,7 +222,7 @@ func TestProcessAndForward_Forward(t *testing.T) { index := 0 for k, v := range value.pf.publishWatermark { // expected watermark should be equal to window end time - assert.Equal(t, v.GetLatestWatermark().Unix(), value.wmExpected[k]) + assert.Equal(t, v.GetLatestWatermark().UnixMilli(), value.wmExpected[k]) index += 1 } }) @@ -232,8 +232,8 @@ func TestProcessAndForward_Forward(t *testing.T) { func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Manager, toBuffers map[string]isb.BufferWriter) ProcessAndForward { testPartition := partition.ID{ - Start: time.Unix(60, 0), - End: time.Unix(120, 0), + Start: time.UnixMilli(60000), + End: time.UnixMilli(120000), Key: key, } @@ -251,7 +251,7 @@ func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Ma { Header: isb.Header{ PaneInfo: isb.PaneInfo{ - EventTime: time.Unix(60, 0), + EventTime: time.UnixMilli(60000), }, ID: "1", }, diff --git a/pkg/reduce/reduce.go b/pkg/reduce/reduce.go index 63dfb9127e..da56c0ab53 100644 --- a/pkg/reduce/reduce.go +++ b/pkg/reduce/reduce.go @@ -76,7 +76,7 @@ func (d *DataForward) forwardAChunk(ctx context.Context) { readMessages, err := d.fromBuffer.Read(ctx, d.opts.readBatchSize) if err != nil { - d.log.Errorw("failed to read from isb", zap.Error(err)) + d.log.Errorw("Failed to read from isb", zap.Error(err)) } if len(readMessages) == 0 { diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 1a068ee130..1532ce66bb 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -49,7 +49,7 @@ var recordGenerator = func(size int32) []byte { r := payload{Data: b, Createdts: nano} data, err := json.Marshal(r) if err != nil { - log.Errorf("error marshalling the record [%v]", r) + log.Errorf("Error marshalling the record [%v]", r) } return data } diff --git a/pkg/udf/builtin/filter/filter.go b/pkg/udf/builtin/filter/filter.go index 55303b8bf2..dd2b168433 100644 --- a/pkg/udf/builtin/filter/filter.go +++ b/pkg/udf/builtin/filter/filter.go @@ -26,7 +26,7 @@ func New(args map[string]string) (functionsdk.MapFunc, error) { log := logging.FromContext(ctx) resultMsg, err := f.apply(datum.Value()) if err != nil { - log.Errorf("filter map function apply got an error: %v", err) + log.Errorf("Filter map function apply got an error: %v", err) } return functionsdk.MessagesBuilder().Append(resultMsg) }, nil diff --git a/pkg/watermark/fetch/edge_fetcher.go b/pkg/watermark/fetch/edge_fetcher.go index a607fe6fad..038427acf7 100644 --- a/pkg/watermark/fetch/edge_fetcher.go +++ b/pkg/watermark/fetch/edge_fetcher.go @@ -55,22 +55,22 @@ func (e *edgeFetcher) GetHeadWatermark() processor.Watermark { debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o)) if o != -1 && o > headOffset { headOffset = o - epoch = p.offsetTimeline.GetEventtimeFromInt64(o) + epoch = p.offsetTimeline.GetEventTimeFromInt64(o) } } e.log.Debugf("GetHeadWatermark: %s", debugString.String()) if epoch == math.MaxInt64 { // Use -1 as default watermark value to indicate there is no valid watermark yet. - return processor.Watermark(time.Unix(-1, 0)) + return processor.Watermark(time.UnixMilli(-1)) } - return processor.Watermark(time.Unix(epoch, 0)) + return processor.Watermark(time.UnixMilli(epoch)) } // GetWatermark gets the smallest timestamp for the given offset func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark { var offset, err = inputOffset.Sequence() if err != nil { - e.log.Errorw("unable to get offset from isb.Offset.Sequence()", zap.Error(err)) + e.log.Errorw("Unable to get offset from isb.Offset.Sequence()", zap.Error(err)) return processor.Watermark(time.Unix(-1, 0)) } var debugString strings.Builder @@ -93,5 +93,5 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark { } e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.edgeName, offset, epoch) - return processor.Watermark(time.Unix(epoch, 0)) + return processor.Watermark(time.UnixMilli(epoch)) } diff --git a/pkg/watermark/fetch/edge_fetcher_test.go b/pkg/watermark/fetch/edge_fetcher_test.go index d68bcfbb2e..b1f2648f48 100644 --- a/pkg/watermark/fetch/edge_fetcher_test.go +++ b/pkg/watermark/fetch/edge_fetcher_test.go @@ -122,11 +122,11 @@ func TestBuffer_GetWatermark(t *testing.T) { processorManager: tt.processorManager, log: zaptest.NewLogger(t).Sugar(), } - if got := b.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) })); time.Time(got).In(location) != time.Unix(tt.want, 0).In(location) { - t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.Unix(tt.want, 0))) + if got := b.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) })); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) { + t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.UnixMilli(tt.want))) } // this will always be 14 because the timeline has been populated ahead of time - assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.Unix(14, 0).In(location)) + assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.UnixMilli(14).In(location)) }) } } diff --git a/pkg/watermark/fetch/offset_timeline.go b/pkg/watermark/fetch/offset_timeline.go index 235eccb9b6..f42baf065e 100644 --- a/pkg/watermark/fetch/offset_timeline.go +++ b/pkg/watermark/fetch/offset_timeline.go @@ -78,7 +78,7 @@ func (t *OffsetTimeline) Put(node OffsetWatermark) { return } else { // TODO put panic: the new input offset should never be smaller than the existing offset - t.log.Errorw("the new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.watermark), + t.log.Errorw("The new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.watermark), zap.Int64("existing offset", elementNode.offset), zap.Int64("input offset", node.offset)) return } @@ -147,10 +147,10 @@ func (t *OffsetTimeline) GetOffset(eventTime int64) int64 { func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64 { // TODO: handle err? inputOffsetInt64, _ := inputOffset.Sequence() - return t.GetEventtimeFromInt64(inputOffsetInt64) + return t.GetEventTimeFromInt64(inputOffsetInt64) } -func (t *OffsetTimeline) GetEventtimeFromInt64(inputOffsetInt64 int64) int64 { +func (t *OffsetTimeline) GetEventTimeFromInt64(inputOffsetInt64 int64) int64 { t.lock.RLock() defer t.lock.RUnlock() diff --git a/pkg/watermark/fetch/options.go b/pkg/watermark/fetch/options.go index 7557adbace..d13e8a2e68 100644 --- a/pkg/watermark/fetch/options.go +++ b/pkg/watermark/fetch/options.go @@ -1,7 +1,9 @@ package fetch type processorManagerOptions struct { - podHeartbeatRate int64 + // podHeartbeatRate uses second as time unit + podHeartbeatRate int64 + // refreshingProcessorsRate uses second as time unit refreshingProcessorsRate int64 separateOTBucket bool } diff --git a/pkg/watermark/fetch/processor_manager_inmem_test.go b/pkg/watermark/fetch/processor_manager_inmem_test.go index 916117778b..b87334b6ed 100644 --- a/pkg/watermark/fetch/processor_manager_inmem_test.go +++ b/pkg/watermark/fetch/processor_manager_inmem_test.go @@ -23,7 +23,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { keyspace = "fetcherTest" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" - epoch int64 = 1651161600 + epoch int64 = 1651161600000 testOffset int64 = 100 wg sync.WaitGroup ) @@ -42,7 +42,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b) assert.NoError(t, err) - epoch += 60 + epoch += 60000 binary.LittleEndian.PutUint64(b, uint64(testOffset+5)) // this key format is meant for non-separate OT watcher err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p2", "_", epoch), b) diff --git a/pkg/watermark/fetch/processor_manager_test.go b/pkg/watermark/fetch/processor_manager_test.go index 5f66a98f19..af8b07d598 100644 --- a/pkg/watermark/fetch/processor_manager_test.go +++ b/pkg/watermark/fetch/processor_manager_test.go @@ -23,7 +23,7 @@ import ( func TestFetcherWithSameOTBucket(t *testing.T) { var ( keyspace = "fetcherTest" - epoch int64 = 1651161600 + epoch int64 = 1651161600000 testOffset int64 = 100 wg sync.WaitGroup ) @@ -87,7 +87,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) { binary.LittleEndian.PutUint64(b, uint64(testOffset)) err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b) assert.NoError(t, err) - epoch += 60 + epoch += 60000 binary.LittleEndian.PutUint64(b, uint64(testOffset+5)) err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p2", "_", epoch), b) assert.NoError(t, err) @@ -306,7 +306,7 @@ func TestFetcherWithSeparateOTBucket(t *testing.T) { defer func() { _ = js.DeleteKeyValue(keyspace + "_PROCESSORS") }() assert.Nil(t, err) - var epoch int64 = 1651161600 + var epoch int64 = 1651161600000 var testOffset int64 = 100 p1OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{ Bucket: keyspace + "_OT_" + "p1", @@ -325,7 +325,7 @@ func TestFetcherWithSeparateOTBucket(t *testing.T) { _, err = p1OT.Put(fmt.Sprintf("%d", epoch), b) assert.NoError(t, err) - epoch += 60 + epoch += 60000 binary.LittleEndian.PutUint64(b, uint64(testOffset+5)) p2OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{ Bucket: keyspace + "_OT_" + "p2", diff --git a/pkg/watermark/fetch/source_fetcher.go b/pkg/watermark/fetch/source_fetcher.go index 7dee7ed305..b0b5c78cf7 100644 --- a/pkg/watermark/fetch/source_fetcher.go +++ b/pkg/watermark/fetch/source_fetcher.go @@ -43,9 +43,9 @@ func (e *sourceFetcher) GetHeadWatermark() processor.Watermark { } if epoch == math.MinInt64 { // Use -1 as default watermark value to indicate there is no valid watermark yet. - return processor.Watermark(time.Unix(-1, 0)) + return processor.Watermark(time.UnixMilli(-1)) } - return processor.Watermark(time.Unix(epoch, 0)) + return processor.Watermark(time.UnixMilli(epoch)) } // GetWatermark returns the lowest of the latest watermark of all the processors, @@ -63,5 +63,5 @@ func (e *sourceFetcher) GetWatermark(_ isb.Offset) processor.Watermark { if epoch == math.MaxInt64 { epoch = -1 } - return processor.Watermark(time.Unix(epoch, 0)) + return processor.Watermark(time.UnixMilli(epoch)) } diff --git a/pkg/watermark/processor/entity.go b/pkg/watermark/processor/entity.go index f8729e0a4e..37991b667c 100644 --- a/pkg/watermark/processor/entity.go +++ b/pkg/watermark/processor/entity.go @@ -18,17 +18,21 @@ type Watermark time.Time func (w Watermark) String() string { var location, _ = time.LoadLocation("UTC") var t = time.Time(w).In(location) - return t.Format(time.RFC3339) + return t.Format(time.RFC3339Nano) } -func (w Watermark) Unix() int64 { - return time.Time(w).Unix() +func (w Watermark) UnixMilli() int64 { + return time.Time(w).UnixMilli() } func (w Watermark) After(t time.Time) bool { return time.Time(w).After(t) } +func (w Watermark) Before(t time.Time) bool { + return time.Time(w).Before(t) +} + type entityOptions struct { separateOTBucket bool keySeparator string @@ -96,9 +100,9 @@ func (p *ProcessorEntity) IsOTBucketShared() bool { // BuildOTWatcherKey builds the offset-timeline key name func (p *ProcessorEntity) BuildOTWatcherKey(watermark Watermark) string { if p.opts.separateOTBucket { - return fmt.Sprintf("%d", watermark.Unix()) + return fmt.Sprintf("%d", watermark.UnixMilli()) } else { - return fmt.Sprintf("%s%s%d", p.GetID(), p.opts.keySeparator, watermark.Unix()) + return fmt.Sprintf("%s%s%d", p.GetID(), p.opts.keySeparator, watermark.UnixMilli()) } } diff --git a/pkg/watermark/processor/entity_test.go b/pkg/watermark/processor/entity_test.go index b739de3c5a..ddf4334bb6 100644 --- a/pkg/watermark/processor/entity_test.go +++ b/pkg/watermark/processor/entity_test.go @@ -29,8 +29,8 @@ func ExampleWatermark_String() { } func TestExampleWatermarkUnix(t *testing.T) { - wm := Watermark(time.Unix(1651129200, 0)) - assert.Equal(t, int64(1651129200), wm.Unix()) + wm := Watermark(time.UnixMilli(1651129200000)) + assert.Equal(t, int64(1651129200000), wm.UnixMilli()) } func TestProcessorEntity_ParseOTWatcherKey(t *testing.T) { diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index c0548ab616..778a088242 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -28,13 +28,15 @@ type Publisher interface { // publish publishes the watermark for a processor entity. type publish struct { - ctx context.Context - entity processor.ProcessorEntitier + ctx context.Context + entity processor.ProcessorEntitier + // heartbeatStore uses second as the time unit for the value heartbeatStore store.WatermarkKVStorer - otStore store.WatermarkKVStorer - log *zap.SugaredLogger - headWatermark processor.Watermark - opts *publishOptions + // osStore uses millisecond as the time unit for the value + otStore store.WatermarkKVStorer + log *zap.SugaredLogger + headWatermark processor.Watermark + opts *publishOptions } // NewPublish returns `Publish`. @@ -83,10 +85,13 @@ func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { } // update p.headWatermark only if wm > p.headWatermark if wm.After(time.Time(p.headWatermark)) { - p.log.Debugw("New watermark updated for head water mark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) p.headWatermark = wm + } else if wm.Before(time.Time(p.headWatermark)) { + p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + return } else { - p.log.Infow("New watermark is ignored because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) return } @@ -138,7 +143,7 @@ func (p *publish) loadLatestFromStore() processor.Watermark { latestWatermark = epoch } } - var timeWatermark = time.Unix(latestWatermark, 0) + var timeWatermark = time.UnixMilli(latestWatermark) return processor.Watermark(timeWatermark) } diff --git a/pkg/watermark/publish/publisher_inmem_test.go b/pkg/watermark/publish/publisher_inmem_test.go index 366e358287..dee31c1975 100644 --- a/pkg/watermark/publish/publisher_inmem_test.go +++ b/pkg/watermark/publish/publisher_inmem_test.go @@ -32,24 +32,24 @@ func TestPublisherWithSharedOTBuckets_InMem(t *testing.T) { p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) - var epoch int64 = 1651161600 + var epoch int64 = 1651161600000 var location, _ = time.LoadLocation("UTC") for i := 0; i < 3; i++ { - p.PublishWatermark(processor.Watermark(time.Unix(epoch, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) - epoch += 60 + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) + epoch += 60000 time.Sleep(time.Millisecond) } // publish a stale watermark (offset doesn't matter) - p.PublishWatermark(processor.Watermark(time.Unix(epoch-120, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"publisherTestPod1_1651161600", "publisherTestPod1_1651161660", "publisherTestPod1_1651161720"}, keys) + assert.Equal(t, []string{"publisherTestPod1_1651161600000", "publisherTestPod1_1651161660000", "publisherTestPod1_1651161720000"}, keys) wm := p.loadLatestFromStore() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), wm.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String()) head := p.GetLatestWatermark() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), head.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) p.StopPublisher() @@ -71,24 +71,24 @@ func TestPublisherWithSeparateOTBucket_InMem(t *testing.T) { p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) - var epoch int64 = 1651161600 + var epoch int64 = 1651161600000 var location, _ = time.LoadLocation("UTC") for i := 0; i < 3; i++ { - p.PublishWatermark(processor.Watermark(time.Unix(epoch, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) - epoch += 60 + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) + epoch += 60000 time.Sleep(time.Millisecond) } // publish a stale watermark (offset doesn't matter) - p.PublishWatermark(processor.Watermark(time.Unix(epoch-120, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"1651161600", "1651161660", "1651161720"}, keys) + assert.Equal(t, []string{"1651161600000", "1651161660000", "1651161720000"}, keys) wm := p.loadLatestFromStore() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), wm.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String()) head := p.GetLatestWatermark() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), head.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) p.StopPublisher() diff --git a/pkg/watermark/publish/publisher_test.go b/pkg/watermark/publish/publisher_test.go index 9217949939..6b54fab4ed 100644 --- a/pkg/watermark/publish/publisher_test.go +++ b/pkg/watermark/publish/publisher_test.go @@ -59,24 +59,24 @@ func TestPublisherWithSeparateOTBuckets(t *testing.T) { p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) - var epoch int64 = 1651161600 + var epoch int64 = 1651161600000 var location, _ = time.LoadLocation("UTC") for i := 0; i < 3; i++ { - p.PublishWatermark(processor.Watermark(time.Unix(epoch, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) - epoch += 60 + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) + epoch += 60000 time.Sleep(time.Millisecond) } // publish a stale watermark (offset doesn't matter) - p.PublishWatermark(processor.Watermark(time.Unix(epoch-120, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"publisherTestPod1_1651161600", "publisherTestPod1_1651161660", "publisherTestPod1_1651161720"}, keys) + assert.Equal(t, []string{"publisherTestPod1_1651161600000", "publisherTestPod1_1651161660000", "publisherTestPod1_1651161720000"}, keys) wm := p.loadLatestFromStore() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), wm.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String()) head := p.GetLatestWatermark() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), head.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) p.StopPublisher() @@ -111,24 +111,24 @@ func TestPublisherWithSharedOTBucket(t *testing.T) { p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) - var epoch int64 = 1651161600 + var epoch int64 = 1651161600000 var location, _ = time.LoadLocation("UTC") for i := 0; i < 3; i++ { - p.PublishWatermark(processor.Watermark(time.Unix(epoch, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) - epoch += 60 + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) + epoch += 60000 time.Sleep(time.Millisecond) } // publish a stale watermark (offset doesn't matter) - p.PublishWatermark(processor.Watermark(time.Unix(epoch-120, 0).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) + p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"1651161600", "1651161660", "1651161720"}, keys) + assert.Equal(t, []string{"1651161600000", "1651161660000", "1651161720000"}, keys) wm := p.loadLatestFromStore() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), wm.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String()) head := p.GetLatestWatermark() - assert.Equal(t, processor.Watermark(time.Unix(epoch-60, 0).In(location)).String(), head.String()) + assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) p.StopPublisher() diff --git a/pkg/watermark/store/jetstream/kv_watch.go b/pkg/watermark/store/jetstream/kv_watch.go index 2b91889add..e5cc20acf2 100644 --- a/pkg/watermark/store/jetstream/kv_watch.go +++ b/pkg/watermark/store/jetstream/kv_watch.go @@ -114,7 +114,7 @@ func (k *jetStreamWatch) Watch(ctx context.Context) (<-chan store.WatermarkKVEnt // call jetstream watch stop err = kvWatcher.Stop() if err != nil { - k.log.Errorw("failed to stop", zap.String("watcher", k.GetKVName()), zap.Error(err)) + k.log.Errorw("Failed to stop", zap.String("watcher", k.GetKVName()), zap.Error(err)) } else { k.log.Infow("WatchAll successfully stopped", zap.String("watcher", k.GetKVName())) } diff --git a/ui/src/components/pipeline/Pipeline.tsx b/ui/src/components/pipeline/Pipeline.tsx index a362078ea9..ee2bcff836 100644 --- a/ui/src/components/pipeline/Pipeline.tsx +++ b/ui/src/components/pipeline/Pipeline.tsx @@ -134,7 +134,7 @@ export function Pipeline() { vertexWatermark.isWaterMarkEnabled = json["isWatermarkEnabled"]; vertexWatermark.watermark = json["watermark"]; vertexWatermark.watermarkLocalTime = new Date( - json["watermark"] * 1000 + json["watermark"] ).toLocaleString(); vertexToWatermarkMap.set(vertex.name, vertexWatermark); });