Skip to content

Commit

Permalink
Simple reduce pipeline. Fixes #289 (#317)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith and yhl25 authored Nov 10, 2022
1 parent 7f5d86c commit 5c43f5a
Show file tree
Hide file tree
Showing 6 changed files with 452 additions and 169 deletions.
39 changes: 39 additions & 0 deletions examples/6-simple-reduce-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: even-odd-sum
spec:
vertices:
- name: in
source:
http: {}
- name: atoi
scale:
min: 1
udf:
container:
# Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/evenodd
image: quay.io/numaio/go-even-odd-example
- name: compute-sum
udf:
container:
# compute the sum
image: quay.io/numaio/go-integer-sum-example
groupBy:
window:
fixed:
length: 60s
keyed: true
- name: sink
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: atoi
- from: atoi
to: compute-sum
parallelism: 2
- from: compute-sum
to: sink
70 changes: 36 additions & 34 deletions pkg/reduce/readloop/readloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import (
"math"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/isb/forward"
"github.com/numaproj/numaflow/pkg/shared/logging"
udfReducer "github.com/numaproj/numaflow/pkg/udf/reducer"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/pbq"
Expand Down Expand Up @@ -96,6 +97,7 @@ func (rl *ReadLoop) Startup(ctx context.Context) {
rl.pbqManager.StartUp(ctx)
// gets the partitions from the state
partitions := rl.pbqManager.ListPartitions()
rl.log.Infow("Partitions to be replayed ", zap.Int("count", len(partitions)), zap.Any("partitions", partitions))

for _, p := range partitions {
// Create keyed window for a given partition
Expand Down Expand Up @@ -145,55 +147,53 @@ func (rl *ReadLoop) Process(ctx context.Context, messages []*isb.ReadMessage) {
q := rl.associatePBQAndPnF(ctx, partitionID)

// write the message to PBQ
writeFn := func(ctx context.Context, m *isb.ReadMessage) error {
return q.Write(ctx, m)
}
ctxClosedErr = rl.executeWithBackOff(ctx, writeFn, "failed to Write Message", pbqWriteBackoff, m, partitionID)
attempt := 0
ctxClosedErr = wait.ExponentialBackoffWithContext(ctx, pbqWriteBackoff, func() (done bool, err error) {
rErr := q.Write(ctx, m)
attempt += 1
if rErr != nil {
rl.log.Errorw("Failed to write message", zap.Any("msgOffSet", m.ReadOffset.String()), zap.String("partitionID", partitionID.String()), zap.Any("attempt", attempt), zap.Error(rErr))
return false, nil
}
return true, nil
})

if ctxClosedErr != nil {
rl.log.Errorw("Error while writing the message to PBQ", zap.Error(ctxClosedErr))
return
}

// Ack the message to ISB
ackFn := func(_ context.Context, m *isb.ReadMessage) error {
return m.ReadOffset.AckIt()
}
ctxClosedErr = rl.executeWithBackOff(ctx, ackFn, "failed to Ack Message", pbqWriteBackoff, m, partitionID)
attempt = 0
ctxClosedErr = wait.ExponentialBackoffWithContext(ctx, pbqWriteBackoff, func() (done bool, err error) {
rErr := m.ReadOffset.AckIt()
attempt += 1
if rErr != nil {
rl.log.Errorw("Failed to ack message", zap.String("msgOffSet", m.ReadOffset.String()), zap.Int("attempt", attempt), zap.Error(rErr))
return false, nil
}
return true, nil
})

if ctxClosedErr != nil {
rl.log.Errorw("Error while acknowledging the message", zap.Error(ctxClosedErr))
return
}
}

// close any windows that need to be closed.
// FIXME(p0): why are we re-reading the watermark? isb.ReadMessage contains watermark.
wm := rl.waterMark(m)
wm := processor.Watermark(m.Watermark)
closedWindows := rl.aw.RemoveWindow(time.Time(wm))
rl.log.Debugw("closing windows", zap.Int("length", len(closedWindows)), zap.Time("watermark", time.Time(wm)))

for _, cw := range closedWindows {
partitions := cw.Partitions()
rl.closePartitions(partitions)
rl.log.Debugw("Closing Window", zap.Time("windowStart", cw.Start), zap.Time("windowEnd", cw.End))
}
}
}

// executeWithBackOff executes a function infinitely until it succeeds using ExponentialBackoffWithContext.
func (rl *ReadLoop) executeWithBackOff(ctx context.Context, retryableFn func(ctx context.Context, message *isb.ReadMessage) error, errMsg string, pbqWriteBackoff wait.Backoff, m *isb.ReadMessage, partitionID partition.ID) error {
attempt := 0
ctxClosedErr := wait.ExponentialBackoffWithContext(ctx, pbqWriteBackoff, func() (done bool, err error) {
rErr := retryableFn(ctx, m)
attempt += 1
if rErr != nil {
rl.log.Errorw(errMsg, zap.Any("msgOffSet", m.ReadOffset.String()), zap.Any("partitionID", partitionID.String()), zap.Any("attempt", attempt), zap.Error(rErr))
return false, nil
}
return true, nil
})

return ctxClosedErr
}

// associatePBQAndPnF associates a PBQ with the partition if a PBQ exists, else creates a new one and then associates
// it to the partition.
func (rl *ReadLoop) associatePBQAndPnF(ctx context.Context, partitionID partition.ID) pbq.ReadWriteCloser {
Expand All @@ -214,7 +214,7 @@ func (rl *ReadLoop) associatePBQAndPnF(ctx context.Context, partitionID partitio
q, pbqErr = rl.pbqManager.CreateNewPBQ(ctx, partitionID)
if pbqErr != nil {
attempt += 1
rl.log.Warnw("Failed to create pbq during startup, retrying", zap.Any("attempt", attempt), zap.Any("partitionID", partitionID.String()), zap.Error(pbqErr))
rl.log.Warnw("Failed to create pbq during startup, retrying", zap.Any("attempt", attempt), zap.String("partitionID", partitionID.String()), zap.Error(pbqErr))
return false, nil
}
return true, nil
Expand All @@ -235,12 +235,19 @@ func (rl *ReadLoop) ShutDown(ctx context.Context) {
// upsertWindowsAndKeys will create or assigns (if already present) a window to the message. It is an upsert operation
// because windows are created out of order, but they will be closed in-order.
func (rl *ReadLoop) upsertWindowsAndKeys(m *isb.ReadMessage) []*keyed.KeyedWindow {
// drop the late messages
if m.IsLate {
rl.log.Warnw("Dropping the late message", zap.Time("eventTime", m.EventTime), zap.Time("watermark", m.Watermark))
return []*keyed.KeyedWindow{}
}

processingWindows := rl.windowingStrategy.AssignWindow(m.EventTime)
var kWindows []*keyed.KeyedWindow
for _, win := range processingWindows {
kw := rl.aw.GetKeyedWindow(win)
if kw == nil {
kw = rl.aw.CreateKeyedWindow(win)
rl.log.Debugw("Creating new keyed window", zap.Any("key", kw.Keys), zap.Int64("startTime", kw.Start.UnixMilli()), zap.Int64("endTime", kw.End.UnixMilli()))
}
// track the key to window relationship
kw.AddKey(m.Key)
Expand All @@ -249,11 +256,6 @@ func (rl *ReadLoop) upsertWindowsAndKeys(m *isb.ReadMessage) []*keyed.KeyedWindo
return kWindows
}

// FIXME: this shouldn't be required.
func (rl *ReadLoop) waterMark(message *isb.ReadMessage) processor.Watermark {
return processor.Watermark(message.Watermark)
}

// closePartitions closes the partitions by invoking close-of-book (COB).
func (rl *ReadLoop) closePartitions(partitions []partition.ID) {
for _, p := range partitions {
Expand Down
11 changes: 6 additions & 5 deletions pkg/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"time"

"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/forward"
"github.com/numaproj/numaflow/pkg/pbq"
Expand All @@ -32,7 +34,6 @@ import (
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/window"
"go.uber.org/zap"
)

// DataForward reads data from isb and forwards them to readloop
Expand Down Expand Up @@ -79,6 +80,7 @@ func (d *DataForward) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
d.log.Infow("Stopping reduce data forwarder... ", zap.Error(ctx.Err()))
return
default:
d.forwardAChunk(ctx)
Expand All @@ -90,7 +92,6 @@ func (d *DataForward) Start(ctx context.Context) {
// and forwards the messages to readloop
func (d *DataForward) forwardAChunk(ctx context.Context) {
readMessages, err := d.fromBuffer.Read(ctx, d.opts.readBatchSize)

if err != nil {
d.log.Errorw("Failed to read from isb", zap.Error(err))
}
Expand All @@ -99,9 +100,9 @@ func (d *DataForward) forwardAChunk(ctx context.Context) {
return
}

// fetch watermark if available
// let's track only the last element's watermark
processorWM := d.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset)
// fetch watermark using the first element's watermark, because we assign the watermark to all other
// elements in the batch based on the watermark we fetch from 0th offset.
processorWM := d.fetchWatermark.GetWatermark(readMessages[0].ReadOffset)
for _, m := range readMessages {
m.Watermark = time.Time(processorWM)
}
Expand Down
Loading

0 comments on commit 5c43f5a

Please sign in to comment.