Skip to content

Commit

Permalink
feat: Watermark millisecond. Fixes #201 (#278)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored and whynowy committed Nov 7, 2022
1 parent c153536 commit f5db937
Show file tree
Hide file tree
Showing 25 changed files with 109 additions and 98 deletions.
6 changes: 3 additions & 3 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request

// If watermark is not enabled, return time zero
if ps.pipeline.Spec.Watermark.Disabled {
timeZero := time.Unix(0, 0).Unix()
timeZero := time.Unix(0, 0).UnixMilli()
v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Expand All @@ -91,13 +91,13 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request

// Vertex not found
if !ok {
log.Errorf("watermark fetchers not available for vertex %s in the fetcher map", vertexName)
log.Errorf("Watermark fetchers not available for vertex %s in the fetcher map", vertexName)
return nil, fmt.Errorf("watermark not available for given vertex, %s", vertexName)
}

var latestWatermark = int64(-1)
for _, fetcher := range vertexFetchers {
watermark := fetcher.GetHeadWatermark().Unix()
watermark := fetcher.GetHeadWatermark().UnixMilli()
if watermark > latestWatermark {
latestWatermark = watermark
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (jr *jetStreamReader) runAckInfomationChecker(ctx context.Context) {
checkAckInfo := func() {
c, err := js.ConsumerInfo(jr.stream, jr.stream)
if err != nil {
jr.log.Errorw("failed to get consumer info in the reader", zap.Error(err))
jr.log.Errorw("Failed to get consumer info in the reader", zap.Error(err))
return
}
ts := timestampedSequence{seq: int64(c.AckFloor.Stream), timestamp: time.Now().Unix()}
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (jw *jetStreamWriter) runStatusChecker(ctx context.Context) {
c, err := js.ConsumerInfo(jw.stream, jw.stream)
if err != nil {
isbFullErrors.With(labels).Inc()
jw.log.Errorw("failed to get consumer info in the writer", zap.Error(err))
jw.log.Errorw("Failed to get consumer info in the writer", zap.Error(err))
return
}
var solidUsage, softUsage float64
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (ms *metricsServer) buildupPendingInfo(ctx context.Context) {
return
case <-ticker.C:
if pending, err := ms.lagReader.Pending(ctx); err != nil {
log.Errorw("failed to get pending messages", zap.Error(err))
log.Errorw("Failed to get pending messages", zap.Error(err))
} else {
if pending != isb.PendingNotAvailable {
ts := timestampedPending{pending: pending, timestamp: time.Now().Unix()}
Expand Down Expand Up @@ -167,7 +167,7 @@ func (ms *metricsServer) exposePendingAndRate(ctx context.Context) {
if ms.rater != nil {
for n, i := range lookbackSecondsMap {
if r, err := ms.rater.Rate(ctx, i); err != nil {
log.Errorw("failed to get processing rate in the past seconds", zap.Int64("seconds", i), zap.Error(err))
log.Errorw("Failed to get processing rate in the past seconds", zap.Int64("seconds", i), zap.Error(err))
} else {
if r != isb.RateNotAvailable {
processingRate.WithLabelValues(ms.vertex.Spec.PipelineName, ms.vertex.Spec.Name, n).Set(r)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pbq/pbq.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var _ ReadWriteCloser = (*PBQ)(nil)
func (p *PBQ) Write(ctx context.Context, message *isb.ReadMessage) error {
// if cob we should return
if p.cob {
p.log.Errorw("failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header))
p.log.Errorw("Failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header))
return ErrCOB
}
var writeErr error
Expand Down Expand Up @@ -84,7 +84,7 @@ readLoop:
for {
readMessages, eof, err := p.store.Read(size)
if err != nil {
p.log.Errorw("error while replaying records from store", zap.Any("ID", p.PartitionID), zap.Error(err))
p.log.Errorw("Error while replaying records from store", zap.Any("ID", p.PartitionID), zap.Error(err))
}
for _, msg := range readMessages {
// select to avoid infinite blocking while writing to output channel
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
isbsCopy := isbs.DeepCopy()
reconcileErr := r.reconcile(ctx, isbsCopy)
if reconcileErr != nil {
log.Errorw("reconcile error", zap.Error(reconcileErr))
log.Errorw("Reconcile error", zap.Error(reconcileErr))
}
if r.needsUpdate(isbs, isbsCopy) {
if err := r.client.Update(ctx, isbsCopy); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (

if !safeToDelete {
log.Info("Pipeline deletion is waiting to finish the unconsumed messages")
//Requeue request to process after 10s
// Requeue request to process after 10s
return ctrl.Result{RequeueAfter: dfv1.DefaultRequeueAfter}, nil
}
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli
if apierrors.IsNotFound(err) { // somehow it doesn't need to clean up
return nil
}
log.Errorw("failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
return err
}

Expand Down Expand Up @@ -626,7 +626,7 @@ func (r *pipelineReconciler) safeToDelete(ctx context.Context, pl *dfv1.Pipeline
if err != nil {
return false, err
}
//Requeue pipeline to take effect the vertex replica changes
// Requeue pipeline to take effect the vertex replica changes
if vertexPatched {
return false, nil
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/reduce/pnf/processandforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func TestProcessAndForward_Process(t *testing.T) {

size := 100
testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Start: time.UnixMilli(60000),
End: time.UnixMilli(120000),
Key: "partition-1",
}
var err error
Expand Down Expand Up @@ -168,38 +168,38 @@ func TestProcessAndForward_Forward(t *testing.T) {
{
name: "test-forward-one",
id: partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Start: time.UnixMilli(60000),
End: time.UnixMilli(120000),
Key: "test-forward-one",
},
buffers: []*simplebuffer.InMemoryBuffer{test1Buffer1, test1Buffer2},
pf: createProcessAndForward(ctx, "test-forward-one", pbqManager, toBuffers1),
expected: []bool{false, true},
wmExpected: map[string]int64{
"buffer1": 120,
"buffer1": 120000,
"buffer2": math.MinInt64,
},
},
{
name: "test-forward-all",
id: partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Start: time.UnixMilli(60000),
End: time.UnixMilli(120000),
Key: "test-forward-all",
},
buffers: []*simplebuffer.InMemoryBuffer{test2Buffer1, test2Buffer2},
pf: createProcessAndForward(ctx, "test-forward-all", pbqManager, toBuffers2),
expected: []bool{false, false},
wmExpected: map[string]int64{
"buffer1": 120,
"buffer2": 120,
"buffer1": 120000,
"buffer2": 120000,
},
},
{
name: "test-drop-all",
id: partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Start: time.UnixMilli(60000),
End: time.UnixMilli(120000),
Key: "test-drop-all",
},
buffers: []*simplebuffer.InMemoryBuffer{test3Buffer1, test3Buffer2},
Expand All @@ -222,7 +222,7 @@ func TestProcessAndForward_Forward(t *testing.T) {
index := 0
for k, v := range value.pf.publishWatermark {
// expected watermark should be equal to window end time
assert.Equal(t, v.GetLatestWatermark().Unix(), value.wmExpected[k])
assert.Equal(t, v.GetLatestWatermark().UnixMilli(), value.wmExpected[k])
index += 1
}
})
Expand All @@ -232,8 +232,8 @@ func TestProcessAndForward_Forward(t *testing.T) {
func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Manager, toBuffers map[string]isb.BufferWriter) ProcessAndForward {

testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Start: time.UnixMilli(60000),
End: time.UnixMilli(120000),
Key: key,
}

Expand All @@ -251,7 +251,7 @@ func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Ma
{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
EventTime: time.Unix(60, 0),
EventTime: time.UnixMilli(60000),
},
ID: "1",
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *DataForward) forwardAChunk(ctx context.Context) {
readMessages, err := d.fromBuffer.Read(ctx, d.opts.readBatchSize)

if err != nil {
d.log.Errorw("failed to read from isb", zap.Error(err))
d.log.Errorw("Failed to read from isb", zap.Error(err))
}

if len(readMessages) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var recordGenerator = func(size int32) []byte {
r := payload{Data: b, Createdts: nano}
data, err := json.Marshal(r)
if err != nil {
log.Errorf("error marshalling the record [%v]", r)
log.Errorf("Error marshalling the record [%v]", r)
}
return data
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/builtin/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func New(args map[string]string) (functionsdk.MapFunc, error) {
log := logging.FromContext(ctx)
resultMsg, err := f.apply(datum.Value())
if err != nil {
log.Errorf("filter map function apply got an error: %v", err)
log.Errorf("Filter map function apply got an error: %v", err)
}
return functionsdk.MessagesBuilder().Append(resultMsg)
}, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,22 @@ func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o))
if o != -1 && o > headOffset {
headOffset = o
epoch = p.offsetTimeline.GetEventtimeFromInt64(o)
epoch = p.offsetTimeline.GetEventTimeFromInt64(o)
}
}
e.log.Debugf("GetHeadWatermark: %s", debugString.String())
if epoch == math.MaxInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
return processor.Watermark(time.Unix(-1, 0))
return processor.Watermark(time.UnixMilli(-1))
}
return processor.Watermark(time.Unix(epoch, 0))
return processor.Watermark(time.UnixMilli(epoch))
}

// GetWatermark gets the smallest timestamp for the given offset
func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark {
var offset, err = inputOffset.Sequence()
if err != nil {
e.log.Errorw("unable to get offset from isb.Offset.Sequence()", zap.Error(err))
e.log.Errorw("Unable to get offset from isb.Offset.Sequence()", zap.Error(err))
return processor.Watermark(time.Unix(-1, 0))
}
var debugString strings.Builder
Expand All @@ -93,5 +93,5 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark {
}
e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.edgeName, offset, epoch)

return processor.Watermark(time.Unix(epoch, 0))
return processor.Watermark(time.UnixMilli(epoch))
}
6 changes: 3 additions & 3 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ func TestBuffer_GetWatermark(t *testing.T) {
processorManager: tt.processorManager,
log: zaptest.NewLogger(t).Sugar(),
}
if got := b.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) })); time.Time(got).In(location) != time.Unix(tt.want, 0).In(location) {
t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.Unix(tt.want, 0)))
if got := b.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) })); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) {
t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.UnixMilli(tt.want)))
}
// this will always be 14 because the timeline has been populated ahead of time
assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.Unix(14, 0).In(location))
assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.UnixMilli(14).In(location))
})
}
}
6 changes: 3 additions & 3 deletions pkg/watermark/fetch/offset_timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (t *OffsetTimeline) Put(node OffsetWatermark) {
return
} else {
// TODO put panic: the new input offset should never be smaller than the existing offset
t.log.Errorw("the new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.watermark),
t.log.Errorw("The new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.watermark),
zap.Int64("existing offset", elementNode.offset), zap.Int64("input offset", node.offset))
return
}
Expand Down Expand Up @@ -147,10 +147,10 @@ func (t *OffsetTimeline) GetOffset(eventTime int64) int64 {
func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64 {
// TODO: handle err?
inputOffsetInt64, _ := inputOffset.Sequence()
return t.GetEventtimeFromInt64(inputOffsetInt64)
return t.GetEventTimeFromInt64(inputOffsetInt64)
}

func (t *OffsetTimeline) GetEventtimeFromInt64(inputOffsetInt64 int64) int64 {
func (t *OffsetTimeline) GetEventTimeFromInt64(inputOffsetInt64 int64) int64 {
t.lock.RLock()
defer t.lock.RUnlock()

Expand Down
4 changes: 3 additions & 1 deletion pkg/watermark/fetch/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package fetch

type processorManagerOptions struct {
podHeartbeatRate int64
// podHeartbeatRate uses second as time unit
podHeartbeatRate int64
// refreshingProcessorsRate uses second as time unit
refreshingProcessorsRate int64
separateOTBucket bool
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/watermark/fetch/processor_manager_inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
keyspace = "fetcherTest"
hbBucketName = keyspace + "_PROCESSORS"
otBucketName = keyspace + "_OT"
epoch int64 = 1651161600
epoch int64 = 1651161600000
testOffset int64 = 100
wg sync.WaitGroup
)
Expand All @@ -42,7 +42,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)
assert.NoError(t, err)

epoch += 60
epoch += 60000
binary.LittleEndian.PutUint64(b, uint64(testOffset+5))
// this key format is meant for non-separate OT watcher
err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p2", "_", epoch), b)
Expand Down
8 changes: 4 additions & 4 deletions pkg/watermark/fetch/processor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func TestFetcherWithSameOTBucket(t *testing.T) {
var (
keyspace = "fetcherTest"
epoch int64 = 1651161600
epoch int64 = 1651161600000
testOffset int64 = 100
wg sync.WaitGroup
)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
binary.LittleEndian.PutUint64(b, uint64(testOffset))
err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)
assert.NoError(t, err)
epoch += 60
epoch += 60000
binary.LittleEndian.PutUint64(b, uint64(testOffset+5))
err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p2", "_", epoch), b)
assert.NoError(t, err)
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestFetcherWithSeparateOTBucket(t *testing.T) {
defer func() { _ = js.DeleteKeyValue(keyspace + "_PROCESSORS") }()
assert.Nil(t, err)

var epoch int64 = 1651161600
var epoch int64 = 1651161600000
var testOffset int64 = 100
p1OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: keyspace + "_OT_" + "p1",
Expand All @@ -325,7 +325,7 @@ func TestFetcherWithSeparateOTBucket(t *testing.T) {
_, err = p1OT.Put(fmt.Sprintf("%d", epoch), b)
assert.NoError(t, err)

epoch += 60
epoch += 60000
binary.LittleEndian.PutUint64(b, uint64(testOffset+5))
p2OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: keyspace + "_OT_" + "p2",
Expand Down
6 changes: 3 additions & 3 deletions pkg/watermark/fetch/source_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (e *sourceFetcher) GetHeadWatermark() processor.Watermark {
}
if epoch == math.MinInt64 {
// Use -1 as default watermark value to indicate there is no valid watermark yet.
return processor.Watermark(time.Unix(-1, 0))
return processor.Watermark(time.UnixMilli(-1))
}
return processor.Watermark(time.Unix(epoch, 0))
return processor.Watermark(time.UnixMilli(epoch))
}

// GetWatermark returns the lowest of the latest watermark of all the processors,
Expand All @@ -63,5 +63,5 @@ func (e *sourceFetcher) GetWatermark(_ isb.Offset) processor.Watermark {
if epoch == math.MaxInt64 {
epoch = -1
}
return processor.Watermark(time.Unix(epoch, 0))
return processor.Watermark(time.UnixMilli(epoch))
}
Loading

0 comments on commit f5db937

Please sign in to comment.