From 5cc5d8b569ee3738211913bc679f4f9bf0b5f93c Mon Sep 17 00:00:00 2001 From: jyu6 Date: Fri, 3 Mar 2023 14:59:07 -0800 Subject: [PATCH 1/6] update serde Signed-off-by: jyu6 --- pkg/isb/stores/jetstream/reader.go | 32 ++++----------------- pkg/isb/stores/jetstream/reader_test.go | 10 ------- pkg/isb/stores/jetstream/writer.go | 38 ++++++++----------------- pkg/isb/stores/jetstream/writer_test.go | 11 ------- 4 files changed, 17 insertions(+), 74 deletions(-) diff --git a/pkg/isb/stores/jetstream/reader.go b/pkg/isb/stores/jetstream/reader.go index 3cd6b57950..68b60414a4 100644 --- a/pkg/isb/stores/jetstream/reader.go +++ b/pkg/isb/stores/jetstream/reader.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "strconv" "strings" "sync" "time" @@ -221,6 +220,7 @@ func (jr *jetStreamReader) Rate(_ context.Context, seconds int64) (float64, erro } func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) { + var err error result := []*isb.ReadMessage{} msgs, err := jr.sub.Fetch(int(count), nats.MaxWait(jr.opts.readTimeOut)) if err != nil && !errors.Is(err, nats.ErrTimeout) { @@ -228,14 +228,10 @@ func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMess return nil, fmt.Errorf("failed to fetch messages from jet stream subject %q, %w", jr.subject, err) } for _, msg := range msgs { - m := &isb.ReadMessage{ - ReadOffset: newOffset(msg, jr.inProgessTickDuration, jr.log), - Message: isb.Message{ - Header: convert2IsbMsgHeader(msg.Header), - Body: isb.Body{ - Payload: msg.Data, - }, - }, + var m = new(isb.ReadMessage) + err = m.UnmarshalBinary(msg.Data) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal the message into isb.ReadMessage, %w", err) } result = append(result, m) } @@ -269,24 +265,6 @@ func (jr *jetStreamReader) Ack(_ context.Context, offsets []isb.Offset) []error return errs } -func convert2IsbMsgHeader(header nats.Header) isb.Header { - r := isb.Header{} - if header.Get(_late) == "1" { - r.IsLate = true - } - if x := header.Get(_id); x != "" { - r.ID = x - } - if x := header.Get(_key); x != "" { - r.Key = x - } - if x := header.Get(_eventTime); x != "" { - i, _ := strconv.ParseInt(x, 10, 64) - r.EventTime = time.UnixMilli(i) - } - return r -} - // offset implements ID interface for JetStream. type offset struct { seq uint64 diff --git a/pkg/isb/stores/jetstream/reader_test.go b/pkg/isb/stores/jetstream/reader_test.go index b3321189ba..4af1a35edd 100644 --- a/pkg/isb/stores/jetstream/reader_test.go +++ b/pkg/isb/stores/jetstream/reader_test.go @@ -171,16 +171,6 @@ func TestClose(t *testing.T) { } -// TestConvert2IsbMsgHeader is used to convert isb header -func TestConvert2IsbMsgHeader(t *testing.T) { - natsHeader := nats.Header{} - natsHeader.Set("w", "1") - natsHeader.Set("ps", "1636470000") - natsHeader.Set("pen", "1636470060") - - assert.NotNil(t, convert2IsbMsgHeader(natsHeader)) -} - func addStream(t *testing.T, js *jsclient.JetStreamContext, streamName string) { s := natstest.RunJetStreamServer(t) defer natstest.ShutdownJetStreamServer(t, s) diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index 822f67ada8..b3bc025530 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -214,10 +214,14 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message, var writeOffsets = make([]isb.Offset, len(messages)) var futures = make([]nats.PubAckFuture, len(messages)) for index, message := range messages { + payload, err := message.MarshalBinary() + if err != nil { + errs[index] = err + continue + } m := &nats.Msg{ - Header: convert2NatsMsgHeader(message.Header), Subject: jw.subject, - Data: message.Payload, + Data: payload, } if future, err := jw.js.PublishMsgAsync(m, nats.MsgId(message.Header.ID)); err != nil { // nats.MsgId() is for exactly-once writing errs[index] = err @@ -272,10 +276,14 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message, wg.Add(1) go func(message isb.Message, idx int) { defer wg.Done() + payload, err := message.MarshalBinary() + if err != nil { + errs[idx] = err + return + } m := &nats.Msg{ - Header: convert2NatsMsgHeader(message.Header), Subject: jw.subject, - Data: message.Payload, + Data: payload, } if pubAck, err := jw.js.PublishMsg(m, nats.MsgId(message.Header.ID), nats.AckWait(2*time.Second)); err != nil { // nats.MsgId() is for exactly-once writing errs[idx] = err @@ -307,25 +315,3 @@ func (w *writeOffset) Sequence() (int64, error) { func (w *writeOffset) AckIt() error { return fmt.Errorf("not supported") } - -const ( - _key = "k" - _id = "i" - _late = "l" - _eventTime = "pev" -) - -func convert2NatsMsgHeader(header isb.Header) nats.Header { - r := nats.Header{} - r.Add(_id, header.ID) - r.Add(_key, header.Key) - if header.IsLate { - r.Add(_late, "1") - } else { - r.Add(_late, "0") - } - if !header.EventTime.IsZero() { - r.Add(_eventTime, fmt.Sprint(header.EventTime.UnixMilli())) - } - return r -} diff --git a/pkg/isb/stores/jetstream/writer_test.go b/pkg/isb/stores/jetstream/writer_test.go index 066c7520e4..942744295b 100644 --- a/pkg/isb/stores/jetstream/writer_test.go +++ b/pkg/isb/stores/jetstream/writer_test.go @@ -263,14 +263,3 @@ func TestWriteClose(t *testing.T) { assert.NoError(t, bw.Close()) } - -// TestConvert2NatsMsgHeader is used to convert nats header -func TestConvert2NatsMsgHeader(t *testing.T) { - isbHeader := isb.Header{ - MessageInfo: isb.MessageInfo{ - EventTime: time.Unix(1636470000, 0), - }, - ID: "1", - } - assert.NotNil(t, convert2NatsMsgHeader(isbHeader)) -} From 7510082acd6341f2a67b06d281a15ce68dec7553 Mon Sep 17 00:00:00 2001 From: jyu6 Date: Fri, 3 Mar 2023 15:11:43 -0800 Subject: [PATCH 2/6] fix logic Signed-off-by: jyu6 --- pkg/isb/stores/jetstream/reader.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/isb/stores/jetstream/reader.go b/pkg/isb/stores/jetstream/reader.go index 68b60414a4..09bd6f8179 100644 --- a/pkg/isb/stores/jetstream/reader.go +++ b/pkg/isb/stores/jetstream/reader.go @@ -228,12 +228,16 @@ func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMess return nil, fmt.Errorf("failed to fetch messages from jet stream subject %q, %w", jr.subject, err) } for _, msg := range msgs { - var m = new(isb.ReadMessage) + var m = new(isb.Message) err = m.UnmarshalBinary(msg.Data) if err != nil { - return nil, fmt.Errorf("failed to unmarshal the message into isb.ReadMessage, %w", err) + return nil, fmt.Errorf("failed to unmarshal the message into isb.Message, %w", err) } - result = append(result, m) + rm := &isb.ReadMessage{ + ReadOffset: newOffset(msg, jr.inProgessTickDuration, jr.log), + Message: *m, + } + result = append(result, rm) } return result, nil } From 8fb8bdf20e4b17174887b6010b77105c7c133ead Mon Sep 17 00:00:00 2001 From: jyu6 Date: Fri, 3 Mar 2023 15:40:16 -0800 Subject: [PATCH 3/6] fix race Signed-off-by: jyu6 --- pkg/watermark/publish/publisher.go | 32 +++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 7ff7c9bcf5..031e5eb8f5 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sync" "time" "go.uber.org/zap" @@ -52,6 +53,7 @@ type publish struct { otStore store.WatermarkKVStorer log *zap.SugaredLogger headWatermark wmb.Watermark + headWMLock sync.RWMutex opts *publishOptions } @@ -90,6 +92,8 @@ func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier // initialSetup inserts the default values as the ProcessorEntitier starts emitting watermarks. func (p *publish) initialSetup() { + p.headWMLock.Lock() + defer p.headWMLock.Unlock() p.headWatermark = p.loadLatestFromStore() } @@ -127,7 +131,7 @@ func (p *publish) PublishWatermark(wm wmb.Watermark, offset isb.Offset) { // TODO: better exponential backoff time.Sleep(time.Millisecond * 250) } else { - p.log.Debugw("New watermark published with offset", zap.Int64("head", p.headWatermark.UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq)) + p.log.Debugw("New watermark published with offset", zap.Int64("head", p.getHeadWatermark().UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq)) break } } @@ -140,14 +144,14 @@ func (p *publish) validateWatermark(wm wmb.Watermark) (wmb.Watermark, bool) { wm = wmb.Watermark(time.Time(wm).Add(-p.opts.delay)) } // update p.headWatermark only if wm > p.headWatermark - if wm.After(time.Time(p.headWatermark)) { - p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) - p.headWatermark = wm - } else if wm.Before(time.Time(p.headWatermark)) { - p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + if wm.After(time.Time(p.getHeadWatermark())) { + p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String())) + p.setHeadWatermark(wm) + } else if wm.Before(time.Time(p.getHeadWatermark())) { + p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String())) return wmb.Watermark{}, true } else { - p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String())) return wmb.Watermark{}, true } return wm, false @@ -207,9 +211,23 @@ func (p *publish) loadLatestFromStore() wmb.Watermark { // GetLatestWatermark returns the latest watermark for that processor. func (p *publish) GetLatestWatermark() wmb.Watermark { + p.headWMLock.RLock() + defer p.headWMLock.RUnlock() return p.headWatermark } +func (p *publish) getHeadWatermark() wmb.Watermark { + p.headWMLock.RLock() + defer p.headWMLock.RUnlock() + return p.headWatermark +} + +func (p *publish) setHeadWatermark(newWM wmb.Watermark) { + p.headWMLock.Lock() + defer p.headWMLock.Unlock() + p.headWatermark = newWM +} + func (p *publish) publishHeartbeat() { ticker := time.NewTicker(time.Second * time.Duration(p.opts.podHeartbeatRate)) defer ticker.Stop() From 3aaa91a8216cbb41b32373d278715d60f8c07865 Mon Sep 17 00:00:00 2001 From: jyu6 Date: Fri, 3 Mar 2023 16:00:13 -0800 Subject: [PATCH 4/6] Revert "fix race" This reverts commit 8fb8bdf20e4b17174887b6010b77105c7c133ead. --- pkg/watermark/publish/publisher.go | 32 +++++++----------------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 031e5eb8f5..7ff7c9bcf5 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "sync" "time" "go.uber.org/zap" @@ -53,7 +52,6 @@ type publish struct { otStore store.WatermarkKVStorer log *zap.SugaredLogger headWatermark wmb.Watermark - headWMLock sync.RWMutex opts *publishOptions } @@ -92,8 +90,6 @@ func NewPublish(ctx context.Context, processorEntity processor.ProcessorEntitier // initialSetup inserts the default values as the ProcessorEntitier starts emitting watermarks. func (p *publish) initialSetup() { - p.headWMLock.Lock() - defer p.headWMLock.Unlock() p.headWatermark = p.loadLatestFromStore() } @@ -131,7 +127,7 @@ func (p *publish) PublishWatermark(wm wmb.Watermark, offset isb.Offset) { // TODO: better exponential backoff time.Sleep(time.Millisecond * 250) } else { - p.log.Debugw("New watermark published with offset", zap.Int64("head", p.getHeadWatermark().UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq)) + p.log.Debugw("New watermark published with offset", zap.Int64("head", p.headWatermark.UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq)) break } } @@ -144,14 +140,14 @@ func (p *publish) validateWatermark(wm wmb.Watermark) (wmb.Watermark, bool) { wm = wmb.Watermark(time.Time(wm).Add(-p.opts.delay)) } // update p.headWatermark only if wm > p.headWatermark - if wm.After(time.Time(p.getHeadWatermark())) { - p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String())) - p.setHeadWatermark(wm) - } else if wm.Before(time.Time(p.getHeadWatermark())) { - p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String())) + if wm.After(time.Time(p.headWatermark)) { + p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) + p.headWatermark = wm + } else if wm.Before(time.Time(p.headWatermark)) { + p.log.Warnw("Skip publishing the new watermark because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) return wmb.Watermark{}, true } else { - p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.getHeadWatermark().String()), zap.String("new", wm.String())) + p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) return wmb.Watermark{}, true } return wm, false @@ -211,23 +207,9 @@ func (p *publish) loadLatestFromStore() wmb.Watermark { // GetLatestWatermark returns the latest watermark for that processor. func (p *publish) GetLatestWatermark() wmb.Watermark { - p.headWMLock.RLock() - defer p.headWMLock.RUnlock() return p.headWatermark } -func (p *publish) getHeadWatermark() wmb.Watermark { - p.headWMLock.RLock() - defer p.headWMLock.RUnlock() - return p.headWatermark -} - -func (p *publish) setHeadWatermark(newWM wmb.Watermark) { - p.headWMLock.Lock() - defer p.headWMLock.Unlock() - p.headWatermark = newWM -} - func (p *publish) publishHeartbeat() { ticker := time.NewTicker(time.Second * time.Duration(p.opts.podHeartbeatRate)) defer ticker.Stop() From 761851abf059930e6b5949651c548a350b6e9ca6 Mon Sep 17 00:00:00 2001 From: jyu6 Date: Fri, 3 Mar 2023 16:01:08 -0800 Subject: [PATCH 5/6] fix test race Signed-off-by: jyu6 --- pkg/reduce/data_forward_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index c0838f069b..3f3719b90d 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -404,7 +404,7 @@ func TestReduceDataForward_Sum(t *testing.T) { go reduceDataForward.Start() // start the producer - go publishMessages(ctx, startTime, messageValue, 300, 10, p[fromBuffer.GetName()], fromBuffer) + publishMessages(ctx, startTime, messageValue, 300, 10, p[fromBuffer.GetName()], fromBuffer) // wait until there is data in to buffer for buffer.IsEmpty() { From 0abc537e6ead8d208c145af7cea12460f938b343 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Fri, 3 Mar 2023 16:44:36 -0800 Subject: [PATCH 6/6] err is highly unlikely Signed-off-by: Vigith Maurice --- pkg/isb/stores/jetstream/reader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/isb/stores/jetstream/reader.go b/pkg/isb/stores/jetstream/reader.go index 09bd6f8179..8442155648 100644 --- a/pkg/isb/stores/jetstream/reader.go +++ b/pkg/isb/stores/jetstream/reader.go @@ -229,6 +229,7 @@ func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMess } for _, msg := range msgs { var m = new(isb.Message) + // err should be nil as we have our own marshaller/unmarshaller err = m.UnmarshalBinary(msg.Data) if err != nil { return nil, fmt.Errorf("failed to unmarshal the message into isb.Message, %w", err)