Skip to content

Commit

Permalink
fix: dedup in user defined source (#1613)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Mar 29, 2024
1 parent c0b9fad commit ef94def
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 88 deletions.
7 changes: 7 additions & 0 deletions pkg/isb/stores/jetstream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,10 @@ var isbAckTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Help: "Processing times of acks for jetstream",
Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10),
}, []string{"buffer"})

// isbDedupCount is used to indicate the number of messages that are duplicate
var isbDedupCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "isb_jetstream",
Name: "dedup_total",
Help: "Total number of jetstream dedup",
}, []string{"buffer"})
3 changes: 3 additions & 0 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
if pubAck.Duplicate {
isbDedupCount.With(metricsLabels).Inc()
}
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
}(msg, index)
Expand Down
13 changes: 7 additions & 6 deletions pkg/sources/udsource/grpc_udsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/numaproj/numaflow/pkg/isb"
sourceclient "github.com/numaproj/numaflow/pkg/sdkclient/source"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sources/udsource/utils"
)

// GRPCBasedUDSource applies a user-defined source over gRPC
Expand Down Expand Up @@ -128,19 +127,21 @@ func (u *GRPCBasedUDSource) ApplyReadFn(ctx context.Context, count int64, timeou
}
// Convert the datum to ReadMessage and append to the list
r := datum.GetResult()

offset := NewUserDefinedSourceOffset(r.GetOffset())
readMessage := &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{EventTime: r.GetEventTime().AsTime()},
ID: constructMessageID(r),
ID: constructMessageID(offset.String(), r.GetOffset().GetPartitionId()),
Keys: r.GetKeys(),
Headers: r.GetHeaders(),
},
Body: isb.Body{
Payload: r.GetPayload(),
},
},
ReadOffset: utils.ConvertToIsbOffset(r.GetOffset()),
ReadOffset: offset,
}
readMessages = append(readMessages, readMessage)
}
Expand All @@ -151,7 +152,7 @@ func (u *GRPCBasedUDSource) ApplyReadFn(ctx context.Context, count int64, timeou
func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset) error {
rOffsets := make([]*sourcepb.Offset, len(offsets))
for i, offset := range offsets {
rOffsets[i] = utils.ConvertToSourceOffset(offset)
rOffsets[i] = ConvertToUserDefinedSourceOffset(offset)
}
var r = &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Expand All @@ -172,7 +173,7 @@ func (u *GRPCBasedUDSource) ApplyPartitionFn(ctx context.Context) ([]int32, erro
return resp.GetResult().GetPartitions(), nil
}

func constructMessageID(r *sourcepb.ReadResponse_Result) string {
func constructMessageID(offset string, partitionIdx int32) string {
// For a user-defined source, the partition ID plus the offset should be able to uniquely identify a message
return fmt.Sprintf("%d-%s", r.GetOffset().GetPartitionId(), string(r.GetOffset().GetOffset()))
return fmt.Sprintf("%d-%s", partitionIdx, offset)
}
27 changes: 18 additions & 9 deletions pkg/sources/udsource/grpc_udsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

"github.com/numaproj/numaflow/pkg/isb"
sourceclient "github.com/numaproj/numaflow/pkg/sdkclient/source"
"github.com/numaproj/numaflow/pkg/sources/udsource/utils"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -187,11 +186,13 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) {
},
}

offset := &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0}

var TestEventTime = time.Unix(1661169600, 0).UTC()
expectedResponse := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
Offset: offset,
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand All @@ -216,7 +217,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) {
assert.Equal(t, 1, len(readMessages))
assert.Equal(t, []byte(`test_payload`), readMessages[0].Body.Payload)
assert.Equal(t, []string{"test_key"}, readMessages[0].Keys)
assert.Equal(t, utils.NewSimpleSourceOffset("test_offset", 0), readMessages[0].ReadOffset)
assert.Equal(t, NewUserDefinedSourceOffset(offset), readMessages[0].ReadOffset)
assert.Equal(t, TestEventTime, readMessages[0].EventTime)
})

Expand Down Expand Up @@ -268,11 +269,15 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

offset1 := &sourcepb.Offset{Offset: []byte("test-offset-1"), PartitionId: 0}
offset2 := &sourcepb.Offset{Offset: []byte("test-offset-2"), PartitionId: 0}

mockClient := sourcemock.NewMockSourceClient(ctrl)
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{
{Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0},
offset1,
offset2,
},
},
}
Expand All @@ -290,8 +295,8 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {

u := NewMockUDSgRPCBasedUDSource(mockClient)
err := u.ApplyAckFn(ctx, []isb.Offset{
utils.NewSimpleSourceOffset("test-offset-1", 0),
utils.NewSimpleSourceOffset("test-offset-2", 0),
NewUserDefinedSourceOffset(offset1),
NewUserDefinedSourceOffset(offset2),
})
assert.NoError(t, err)
})
Expand All @@ -300,11 +305,15 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

offset1 := &sourcepb.Offset{Offset: []byte("test-offset-1"), PartitionId: 0}
offset2 := &sourcepb.Offset{Offset: []byte("test-offset-2"), PartitionId: 0}

mockClient := sourcemock.NewMockSourceClient(ctrl)
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{
{Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0},
offset1,
offset2,
},
},
}
Expand All @@ -321,8 +330,8 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {

u := NewMockUDSgRPCBasedUDSource(mockClient)
err := u.ApplyAckFn(ctx, []isb.Offset{
utils.NewSimpleSourceOffset("test-offset-1", 0),
utils.NewSimpleSourceOffset("test-offset-2", 0),
NewUserDefinedSourceOffset(offset1),
NewUserDefinedSourceOffset(offset2),
})
assert.ErrorIs(t, err, status.New(codes.DeadlineExceeded, "mock test err").Err())
})
Expand Down
70 changes: 70 additions & 0 deletions pkg/sources/udsource/offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package udsource

import (
"encoding/base64"
"fmt"

sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1"

"github.com/numaproj/numaflow/pkg/isb"
)

// userDefinedSourceOffset is a implementation of isb.Offset from the user-defined source side.
type userDefinedSourceOffset struct {
// NOTE: encodedOffsetStr is base64 encoded string because we use encodedOffsetStr to construct message ID
// and message ID is a string.
encodedOffsetStr string
offset []byte
partitionIdx int32
}

func NewUserDefinedSourceOffset(offset *sourcepb.Offset) isb.Offset {
return &userDefinedSourceOffset{
offset: offset.GetOffset(),
encodedOffsetStr: base64.StdEncoding.EncodeToString(offset.GetOffset()),
partitionIdx: offset.GetPartitionId(),
}
}

func (s *userDefinedSourceOffset) String() string {
return fmt.Sprintf("%s-%d", s.encodedOffsetStr, s.partitionIdx)
}

func (s *userDefinedSourceOffset) PartitionIdx() int32 {
return s.partitionIdx
}

func (s *userDefinedSourceOffset) Sequence() (int64, error) {
panic("Sequence is not supported by userDefinedSourceOffset")
}

func (s *userDefinedSourceOffset) AckIt() error {
panic("AckIt is not supported by userDefinedSourceOffset")
}

func (s *userDefinedSourceOffset) NoAck() error {
panic("NoAck is not supported by userDefinedSourceOffset")
}

func ConvertToUserDefinedSourceOffset(offset isb.Offset) *sourcepb.Offset {
return &sourcepb.Offset{
PartitionId: offset.PartitionIdx(),
Offset: offset.(*userDefinedSourceOffset).offset,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package utils
package udsource

import (
"testing"
Expand All @@ -24,17 +24,18 @@ import (
)

func TestOffsetConversion(t *testing.T) {
testIsbOffset := NewSimpleSourceOffset("test", 0)
convertedSrcOffset := ConvertToSourceOffset(testIsbOffset)
convertedBackIsbOffset := ConvertToIsbOffset(convertedSrcOffset)
offset := &sourcepb.Offset{Offset: []byte("test"), PartitionId: 0}
testIsbOffset := NewUserDefinedSourceOffset(offset)
ConvertToUserDefinedSourceOffset(testIsbOffset)
convertedBackIsbOffset := NewUserDefinedSourceOffset(offset)
assert.Equal(t, testIsbOffset.PartitionIdx(), convertedBackIsbOffset.PartitionIdx())
assert.Equal(t, testIsbOffset.String(), convertedBackIsbOffset.String())
testSrcOffset := &sourcepb.Offset{
PartitionId: 0,
Offset: []byte("test"),
}
convertedIsbOffset := ConvertToIsbOffset(testSrcOffset)
convertedBackSrcOffset := ConvertToSourceOffset(convertedIsbOffset)
convertedIsbOffset := NewUserDefinedSourceOffset(offset)
convertedBackSrcOffset := ConvertToUserDefinedSourceOffset(convertedIsbOffset)
assert.Equal(t, testSrcOffset.GetPartitionId(), convertedBackSrcOffset.GetPartitionId())
assert.Equal(t, testSrcOffset.GetOffset(), convertedBackSrcOffset.GetOffset())
}
67 changes: 0 additions & 67 deletions pkg/sources/udsource/utils/offset.go

This file was deleted.

0 comments on commit ef94def

Please sign in to comment.