From 148663e7583af02dcb665b922d687e215ab7a5df Mon Sep 17 00:00:00 2001 From: Juanlu Yu <19543684+chromevoid@users.noreply.github.com> Date: Sat, 4 Mar 2023 08:06:07 -0800 Subject: [PATCH] feat: use customized binary serde for nats message payload (#585) --- pkg/isb/stores/jetstream/reader.go | 37 +++++++----------------- pkg/isb/stores/jetstream/reader_test.go | 10 ------- pkg/isb/stores/jetstream/writer.go | 38 ++++++++----------------- pkg/isb/stores/jetstream/writer_test.go | 11 ------- pkg/reduce/data_forward_test.go | 2 +- 5 files changed, 23 insertions(+), 75 deletions(-) diff --git a/pkg/isb/stores/jetstream/reader.go b/pkg/isb/stores/jetstream/reader.go index 3cd6b57950..8442155648 100644 --- a/pkg/isb/stores/jetstream/reader.go +++ b/pkg/isb/stores/jetstream/reader.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "strconv" "strings" "sync" "time" @@ -221,6 +220,7 @@ func (jr *jetStreamReader) Rate(_ context.Context, seconds int64) (float64, erro } func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) { + var err error result := []*isb.ReadMessage{} msgs, err := jr.sub.Fetch(int(count), nats.MaxWait(jr.opts.readTimeOut)) if err != nil && !errors.Is(err, nats.ErrTimeout) { @@ -228,16 +228,17 @@ func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMess return nil, fmt.Errorf("failed to fetch messages from jet stream subject %q, %w", jr.subject, err) } for _, msg := range msgs { - m := &isb.ReadMessage{ + var m = new(isb.Message) + // err should be nil as we have our own marshaller/unmarshaller + err = m.UnmarshalBinary(msg.Data) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal the message into isb.Message, %w", err) + } + rm := &isb.ReadMessage{ ReadOffset: newOffset(msg, jr.inProgessTickDuration, jr.log), - Message: isb.Message{ - Header: convert2IsbMsgHeader(msg.Header), - Body: isb.Body{ - Payload: msg.Data, - }, - }, + Message: *m, } - result = append(result, m) + result = append(result, rm) } return result, nil } @@ -269,24 +270,6 @@ func (jr *jetStreamReader) Ack(_ context.Context, offsets []isb.Offset) []error return errs } -func convert2IsbMsgHeader(header nats.Header) isb.Header { - r := isb.Header{} - if header.Get(_late) == "1" { - r.IsLate = true - } - if x := header.Get(_id); x != "" { - r.ID = x - } - if x := header.Get(_key); x != "" { - r.Key = x - } - if x := header.Get(_eventTime); x != "" { - i, _ := strconv.ParseInt(x, 10, 64) - r.EventTime = time.UnixMilli(i) - } - return r -} - // offset implements ID interface for JetStream. type offset struct { seq uint64 diff --git a/pkg/isb/stores/jetstream/reader_test.go b/pkg/isb/stores/jetstream/reader_test.go index b3321189ba..4af1a35edd 100644 --- a/pkg/isb/stores/jetstream/reader_test.go +++ b/pkg/isb/stores/jetstream/reader_test.go @@ -171,16 +171,6 @@ func TestClose(t *testing.T) { } -// TestConvert2IsbMsgHeader is used to convert isb header -func TestConvert2IsbMsgHeader(t *testing.T) { - natsHeader := nats.Header{} - natsHeader.Set("w", "1") - natsHeader.Set("ps", "1636470000") - natsHeader.Set("pen", "1636470060") - - assert.NotNil(t, convert2IsbMsgHeader(natsHeader)) -} - func addStream(t *testing.T, js *jsclient.JetStreamContext, streamName string) { s := natstest.RunJetStreamServer(t) defer natstest.ShutdownJetStreamServer(t, s) diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index 822f67ada8..b3bc025530 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -214,10 +214,14 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message, var writeOffsets = make([]isb.Offset, len(messages)) var futures = make([]nats.PubAckFuture, len(messages)) for index, message := range messages { + payload, err := message.MarshalBinary() + if err != nil { + errs[index] = err + continue + } m := &nats.Msg{ - Header: convert2NatsMsgHeader(message.Header), Subject: jw.subject, - Data: message.Payload, + Data: payload, } if future, err := jw.js.PublishMsgAsync(m, nats.MsgId(message.Header.ID)); err != nil { // nats.MsgId() is for exactly-once writing errs[index] = err @@ -272,10 +276,14 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message, wg.Add(1) go func(message isb.Message, idx int) { defer wg.Done() + payload, err := message.MarshalBinary() + if err != nil { + errs[idx] = err + return + } m := &nats.Msg{ - Header: convert2NatsMsgHeader(message.Header), Subject: jw.subject, - Data: message.Payload, + Data: payload, } if pubAck, err := jw.js.PublishMsg(m, nats.MsgId(message.Header.ID), nats.AckWait(2*time.Second)); err != nil { // nats.MsgId() is for exactly-once writing errs[idx] = err @@ -307,25 +315,3 @@ func (w *writeOffset) Sequence() (int64, error) { func (w *writeOffset) AckIt() error { return fmt.Errorf("not supported") } - -const ( - _key = "k" - _id = "i" - _late = "l" - _eventTime = "pev" -) - -func convert2NatsMsgHeader(header isb.Header) nats.Header { - r := nats.Header{} - r.Add(_id, header.ID) - r.Add(_key, header.Key) - if header.IsLate { - r.Add(_late, "1") - } else { - r.Add(_late, "0") - } - if !header.EventTime.IsZero() { - r.Add(_eventTime, fmt.Sprint(header.EventTime.UnixMilli())) - } - return r -} diff --git a/pkg/isb/stores/jetstream/writer_test.go b/pkg/isb/stores/jetstream/writer_test.go index 066c7520e4..942744295b 100644 --- a/pkg/isb/stores/jetstream/writer_test.go +++ b/pkg/isb/stores/jetstream/writer_test.go @@ -263,14 +263,3 @@ func TestWriteClose(t *testing.T) { assert.NoError(t, bw.Close()) } - -// TestConvert2NatsMsgHeader is used to convert nats header -func TestConvert2NatsMsgHeader(t *testing.T) { - isbHeader := isb.Header{ - MessageInfo: isb.MessageInfo{ - EventTime: time.Unix(1636470000, 0), - }, - ID: "1", - } - assert.NotNil(t, convert2NatsMsgHeader(isbHeader)) -} diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index c0838f069b..3f3719b90d 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -404,7 +404,7 @@ func TestReduceDataForward_Sum(t *testing.T) { go reduceDataForward.Start() // start the producer - go publishMessages(ctx, startTime, messageValue, 300, 10, p[fromBuffer.GetName()], fromBuffer) + publishMessages(ctx, startTime, messageValue, 300, 10, p[fromBuffer.GetName()], fromBuffer) // wait until there is data in to buffer for buffer.IsEmpty() {