Skip to content

Commit

Permalink
fix race
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jyu6 committed Mar 3, 2023
1 parent 7510082 commit 8fb8bdf
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions pkg/watermark/publish/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -52,6 +53,7 @@ type publish struct {
otStore store.WatermarkKVStorer
log *zap.SugaredLogger
headWatermark wmb.Watermark
headWMLock sync.RWMutex
opts *publishOptions
}

Expand Down Expand Up @@ -90,6 +92,8 @@ func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier

// initialSetup inserts the default values as the ProcessorEntitier starts emitting watermarks.
func (p *publish) initialSetup() {
p.headWMLock.Lock()
defer p.headWMLock.Unlock()
p.headWatermark = p.loadLatestFromStore()
}

Expand Down Expand Up @@ -127,7 +131,7 @@ func (p *publish) PublishWatermark(wm wmb.Watermark, offset isb.Offset) {
// TODO: better exponential backoff
time.Sleep(time.Millisecond * 250)
} else {
p.log.Debugw("New watermark published with offset", zap.Int64("head", p.headWatermark.UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq))
p.log.Debugw("New watermark published with offset", zap.Int64("head", p.getHeadWatermark().UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq))
break
}
}
Expand All @@ -140,14 +144,14 @@ func (p *publish) validateWatermark(wm wmb.Watermark) (wmb.Watermark, bool) {
wm = wmb.Watermark(time.Time(wm).Add(-p.opts.delay))
}
// update p.headWatermark only if wm > p.headWatermark
if wm.After(time.Time(p.headWatermark)) {
p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
p.headWatermark = wm
} else if wm.Before(time.Time(p.headWatermark)) {
p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
if wm.After(time.Time(p.getHeadWatermark())) {
p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String()))
p.setHeadWatermark(wm)
} else if wm.Before(time.Time(p.getHeadWatermark())) {
p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String()))
return wmb.Watermark{}, true
} else {
p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String()))
return wmb.Watermark{}, true
}
return wm, false
Expand Down Expand Up @@ -207,9 +211,23 @@ func (p *publish) loadLatestFromStore() wmb.Watermark {

// GetLatestWatermark returns the latest watermark for that processor.
func (p *publish) GetLatestWatermark() wmb.Watermark {
p.headWMLock.RLock()
defer p.headWMLock.RUnlock()
return p.headWatermark
}

func (p *publish) getHeadWatermark() wmb.Watermark {
p.headWMLock.RLock()
defer p.headWMLock.RUnlock()
return p.headWatermark
}

func (p *publish) setHeadWatermark(newWM wmb.Watermark) {
p.headWMLock.Lock()
defer p.headWMLock.Unlock()
p.headWatermark = newWM
}

func (p *publish) publishHeartbeat() {
ticker := time.NewTicker(time.Second * time.Duration(p.opts.podHeartbeatRate))
defer ticker.Stop()
Expand Down

0 comments on commit 8fb8bdf

Please sign in to comment.