Skip to content

Commit

Permalink
feat: using one bucket for partitioned reduce watermark propagation (#…
Browse files Browse the repository at this point in the history
…742)

Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored May 25, 2023
1 parent ba1f493 commit e383ee2
Show file tree
Hide file tree
Showing 27 changed files with 1,137 additions and 771 deletions.
2 changes: 1 addition & 1 deletion examples/6-reduce-fixed-window.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
http: {}
- name: atoi
scale:
min: 1
min: 3
udf:
container:
# Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/even_odd
Expand Down
5 changes: 4 additions & 1 deletion pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/jetstream"
"github.com/spf13/viper"
Expand Down Expand Up @@ -289,7 +290,9 @@ func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName
if err != nil {
return nil, err
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, store.BuildWatermarkStoreWatcher(hbWatch, otWatch))
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch)
pm := processor.NewProcessorManager(ctx, storeWatcher)
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm)
return watermarkFetcher, nil
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

redis2 "github.com/numaproj/numaflow/pkg/isb/stores/redis"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"go.uber.org/multierr"
Expand Down Expand Up @@ -142,6 +143,8 @@ func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName st
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
hbWatcher := noop.NewKVOpWatch()
otWatcher := noop.NewKVOpWatch()
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher))
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher)
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm)
return watermarkFetcher, nil
}
9 changes: 6 additions & 3 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,8 +1122,9 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB

hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh)
otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh)

var f = fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher))
storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher, processor.WithVertexReplica(0), processor.WithIsReduce(true))
f := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm)
return f, sourcePublisher
}

Expand All @@ -1132,12 +1133,13 @@ func buildPublisherMapAndOTStore(ctx context.Context, toBuffers map[string]isb.B
otStores := make(map[string]wmstore.WatermarkKVStorer)

// create publisher for to Buffers
index := int32(0)
for key := range toBuffers {
publishEntity := processor.NewProcessorEntity(key)
hb, hbKVEntry, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, key+"_PROCESSORS")
ot, otKVEntry, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, key+"_OT")
otStores[key] = ot
p := publish.NewPublish(ctx, publishEntity, wmstore.BuildWatermarkStore(hb, ot), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
p := publish.NewPublish(ctx, publishEntity, wmstore.BuildWatermarkStore(hb, ot), publish.WithToVertexPartition(index), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
publishers[key] = p

go func() {
Expand All @@ -1152,6 +1154,7 @@ func buildPublisherMapAndOTStore(ctx context.Context, toBuffers map[string]isb.B
}
}
}()
index++
}
return publishers, otStores
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/pnf/processandforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func buildPublisherMapAndOTStore(toBuffers map[string]isb.BufferWriter) (map[str
heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key))
otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key))
otStores[key] = otKV
p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), publish.WithToVertexPartition(0), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
publishers[key] = p
}
return publishers, otStores
Expand Down
25 changes: 16 additions & 9 deletions pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// package fetch contains the logic to fetch the watermark for an offset.
// we iterate over all the active processors and get the smallest watermark.
// if the processor is not active, and if the current offset is greater than the last offset of the processor,
// we delete the processor using processor manager.

package fetch

import (
Expand All @@ -25,6 +30,8 @@ import (

"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/watermark/processor"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/store"
Expand All @@ -36,24 +43,24 @@ type edgeFetcher struct {
ctx context.Context
bufferName string
storeWatcher store.WatermarkStoreWatcher
processorManager *ProcessorManager
processorManager *processor.ProcessorManager
log *zap.SugaredLogger
}

// NewEdgeFetcher returns a new edge fetcher.
func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.WatermarkStoreWatcher) Fetcher {
func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.WatermarkStoreWatcher, manager *processor.ProcessorManager) Fetcher {
log := logging.FromContext(ctx).With("bufferName", bufferName)
log.Info("Creating a new edge watermark fetcher")
return &edgeFetcher{
ctx: ctx,
bufferName: bufferName,
storeWatcher: storeWatcher,
processorManager: NewProcessorManager(ctx, storeWatcher),
processorManager: manager,
log: log,
}
}

// GetWatermark gets the smallest timestamp for the given offset
// GetWatermark gets the smallest watermark for the given offset
func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) wmb.Watermark {
var offset, err = inputOffset.Sequence()
if err != nil {
Expand All @@ -65,15 +72,15 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) wmb.Watermark {
var allProcessors = e.processorManager.GetAllProcessors()
for _, p := range allProcessors {
debugString.WriteString(fmt.Sprintf("[Processor: %v] \n", p))
var t = p.offsetTimeline.GetEventTime(inputOffset)
var t = p.GetOffsetTimeline().GetEventTime(inputOffset)
if t == -1 { // watermark cannot be computed, perhaps a new processing unit was added or offset fell off the timeline
epoch = t
} else if t < epoch {
epoch = t
}
if p.IsDeleted() && (offset > p.offsetTimeline.GetHeadOffset()) {
if p.IsDeleted() && (offset > p.GetOffsetTimeline().GetHeadOffset()) {
// if the pod is not active and the current offset is ahead of all offsets in Timeline
e.processorManager.DeleteProcessor(p.entity.GetName())
e.processorManager.DeleteProcessor(p.GetEntity().GetName())
}
}
// if there are no processors
Expand All @@ -100,7 +107,7 @@ func (e *edgeFetcher) GetHeadWatermark() wmb.Watermark {
if !p.IsActive() {
continue
}
var w = p.offsetTimeline.GetHeadWMB()
var w = p.GetOffsetTimeline().GetHeadWMB()
e.log.Debugf("Processor: %v (headOffset:%d) (headWatermark:%d) (headIdle:%t)", p, w.Offset, w.Watermark, w.Idle)
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headOffset:%d) (headWatermark:%d) (headIdle:%t) \n", p, w.Offset, w.Watermark, w.Idle))
if w.Offset != -1 {
Expand Down Expand Up @@ -135,7 +142,7 @@ func (e *edgeFetcher) GetHeadWMB() wmb.WMB {
continue
}
// we only consider the latest wmb in the offset timeline
var curHeadWMB = p.offsetTimeline.GetHeadWMB()
var curHeadWMB = p.GetOffsetTimeline().GetHeadWMB()
if !curHeadWMB.Idle {
e.log.Debugf("[%s] GetHeadWMB finds an active head wmb for offset, return early", e.bufferName)
return wmb.WMB{}
Expand Down
Loading

0 comments on commit e383ee2

Please sign in to comment.