Skip to content

Commit

Permalink
fix: race condition while publishing wm inside reduce (#1599)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Apr 3, 2024
1 parent 74ab70a commit 3dbba4f
Show file tree
Hide file tree
Showing 18 changed files with 722 additions and 849 deletions.
2 changes: 1 addition & 1 deletion examples/6-reduce-fixed-window-with-pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
udf:
container:
# compute the sum
image: quay.io/numaio/numaflow-go/reduce-sum:v0.6.1
image: quay.io/numaio/numaflow-go/reduce-sum:stable
groupBy:
window:
fixed:
Expand Down
2 changes: 1 addition & 1 deletion examples/6-reduce-fixed-window.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
length: 60s
keyed: true
storage:
none: {}
emptyDir: {}
partitions: 2
- name: sink
scale:
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ const (
DefaultWALCompactionDuration = 60 * time.Second // Default compaction duration
DefaultCompactWALPath = PathPBQMount + "/compact-wals" // Default compaction wal path

// Default Pnf options
DefaultPnfBatchSize = 100 // Default flush batch size for pnf
DefaultPnfFlushDuration = time.Second // Default flush duration for pnf

// DefaultKeyForNonKeyedData Default key for non keyed stream
DefaultKeyForNonKeyedData = "NON_KEYED_STREAM"

Expand Down
1 change: 1 addition & 0 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
errs[idx] = nil
if pubAck.Duplicate {
isbDedupCount.With(metricsLabels).Inc()
jw.log.Infow("Duplicate message detected", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type DataForward struct {
pbqManager *pbq.Manager
whereToDecider forwarder.ToWhichStepDecider
storeManager wal.Manager
of *pnf.Manager
of *pnf.ProcessAndForward
opts *Options
currentWatermark time.Time // if watermark is -1, then make sure event-time is < watermark
log *zap.SugaredLogger
Expand All @@ -89,7 +89,7 @@ func NewDataForward(ctx context.Context,
watermarkPublishers map[string]publish.Publisher,
windowingStrategy window.TimedWindower,
idleManager wmb.IdleManager,
of *pnf.Manager,
of *pnf.ProcessAndForward,
opts ...Option) (*DataForward, error) {

options := DefaultOptions()
Expand Down
22 changes: 11 additions & 11 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestDataForward_StartWithNoOpWM(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(child, keyedVertex, CounterReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publisher, idleManager, windower)
op := pnf.NewProcessAndForward(child, keyedVertex, CounterReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publisher, idleManager, windower)

var reduceDataForwarder *DataForward
reduceDataForwarder, err = NewDataForward(child, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, wmpublisher, publisher,
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestReduceDataForward_IdleWM(t *testing.T) {
windower := fixed.NewWindower(5*time.Second, keyedVertex)
idleManager, err := wmb.NewIdleManager(1, len(toBuffers))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, CounterReduceTest{}, toBuffers, pbqManager, CounterReduceTest{}, publisherMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, CounterReduceTest{}, toBuffers, pbqManager, CounterReduceTest{}, publisherMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffers, pbqManager, storeManager, CounterReduceTest{}, f, publisherMap,
Expand Down Expand Up @@ -717,7 +717,7 @@ func TestReduceDataForward_Count(t *testing.T) {
windower := fixed.NewWindower(60*time.Second, keyedVertex)
idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, CounterReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publisherMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, CounterReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publisherMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, f, publisherMap,
Expand Down Expand Up @@ -803,7 +803,7 @@ func TestReduceDataForward_AllowedLatencyCount(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, CounterReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publisherMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, CounterReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publisherMap, idleManager, windower)

var reduceDataForward *DataForward
allowedLatency := 1000
Expand Down Expand Up @@ -891,7 +891,7 @@ func TestReduceDataForward_Sum(t *testing.T) {
windower := fixed.NewWindower(2*time.Minute, keyedVertex)
idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, f, publishersMap,
Expand Down Expand Up @@ -977,7 +977,7 @@ func TestReduceDataForward_Max(t *testing.T) {
windower := fixed.NewWindower(5*time.Minute, keyedVertex)
idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, MaxReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, MaxReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, f, publishersMap,
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestReduceDataForward_FixedSumWithDifferentKeys(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, f, publishersMap,
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, SessionSumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, SessionSumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, SessionSumReduceTest{}, f, publishersMap,
Expand Down Expand Up @@ -1276,7 +1276,7 @@ func TestReduceDataForward_NonKeyed(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, nonKeyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, f, publishersMap,
Expand Down Expand Up @@ -1367,7 +1367,7 @@ func TestDataForward_WithContextClose(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, CounterReduceTest{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(cctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, CounterReduceTest{}, f, publishersMap,
Expand Down Expand Up @@ -1465,7 +1465,7 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) {

idleManager, err := wmb.NewIdleManager(1, len(toBuffer))
assert.NoError(t, err)
op := pnf.NewPnFManager(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, &myForwardTestRoundRobin{}, publishersMap, idleManager, windower)
op := pnf.NewProcessAndForward(ctx, keyedVertex, SumReduceTest{}, toBuffer, pbqManager, &myForwardTestRoundRobin{}, publishersMap, idleManager, windower)

var reduceDataForward *DataForward
reduceDataForward, err = NewDataForward(ctx, keyedVertex, fromBuffer, toBuffer, pbqManager, storeManager, &myForwardTestRoundRobin{}, f, publishersMap,
Expand Down
20 changes: 20 additions & 0 deletions pkg/reduce/pnf/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package pnf

import (
"time"

"github.com/numaproj/numaflow/pkg/reduce/pbq/wal/unaligned"
"github.com/numaproj/numaflow/pkg/window"
)

type options struct {
gcEventsTracker unaligned.GCEventsWAL
windowType window.Type
batchSize int
flushDuration time.Duration
}

type Option func(options *options) error
Expand All @@ -43,3 +47,19 @@ func WithWindowType(windowType window.Type) Option {
return nil
}
}

// WithBatchSize sets the batch size for forwarding messages to ISB.
func WithBatchSize(batchSize int) Option {
return func(o *options) error {
o.batchSize = batchSize
return nil
}
}

// WithFlushDuration sets the flush duration for forwarding messages to ISB.
func WithFlushDuration(flushDuration time.Duration) Option {
return func(o *options) error {
o.flushDuration = flushDuration
return nil
}
}
Loading

0 comments on commit 3dbba4f

Please sign in to comment.