Skip to content

Commit

Permalink
Chore: Windower interface. closes #234 (#340)
Browse files Browse the repository at this point in the history
Signed-off-by: ashwinidulams <[email protected]>
  • Loading branch information
ashwinidulams authored and whynowy committed Nov 18, 2022
1 parent 5d7b9d8 commit 6f5e83a
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 791 deletions.
59 changes: 0 additions & 59 deletions pkg/messages/interfaces.go

This file was deleted.

57 changes: 27 additions & 30 deletions pkg/reduce/readloop/readloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,18 @@ import (
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/window"
"github.com/numaproj/numaflow/pkg/window/keyed"
"github.com/numaproj/numaflow/pkg/window/strategy/fixed"
)

// ReadLoop is responsible for reading and forwarding the message from ISB to PBQ.
type ReadLoop struct {
UDF applier.ReduceApplier
pbqManager *pbq.Manager
windowingStrategy window.Windower
aw *fixed.ActiveWindows
op *orderedForwarder
log *zap.SugaredLogger
toBuffers map[string]isb.BufferWriter
whereToDecider forward.ToWhichStepDecider
publishWatermark map[string]publish.Publisher
UDF applier.ReduceApplier
pbqManager *pbq.Manager
windower window.Windower
op *orderedForwarder
log *zap.SugaredLogger
toBuffers map[string]isb.BufferWriter
whereToDecider forward.ToWhichStepDecider
publishWatermark map[string]publish.Publisher
}

// NewReadLoop initializes and returns ReadLoop.
Expand All @@ -74,10 +72,8 @@ func NewReadLoop(ctx context.Context,
op := newOrderedForwarder(ctx)

rl := &ReadLoop{
UDF: udf,
windowingStrategy: windowingStrategy,
// TODO: pass window type
aw: fixed.NewWindows(),
UDF: udf,
windower: windowingStrategy,
pbqManager: pbqManager,
op: op,
log: logging.FromContext(ctx),
Expand All @@ -103,12 +99,12 @@ func (rl *ReadLoop) Startup(ctx context.Context) {
// so that the window can be closed when the watermark
// crosses the window.
id := p.PartitionID
intervalWindow := &window.IntervalWindow{
intervalWindow := &keyed.KeyedWindow{
Start: id.Start,
End: id.End,
}
// These windows have to be recreated as they are completely in-memory
rl.aw.CreateKeyedWindow(intervalWindow)
rl.windower.CreateWindow(intervalWindow)

// create and invoke process and forward for the partition
rl.associatePBQAndPnF(ctx, p.PartitionID)
Expand Down Expand Up @@ -138,8 +134,8 @@ func (rl *ReadLoop) Process(ctx context.Context, messages []*isb.ReadMessage) {
for _, kw := range windows {
// identify partition for message
partitionID := partition.ID{
Start: kw.IntervalWindow.Start,
End: kw.IntervalWindow.End,
Start: kw.StartTime(),
End: kw.EndTime(),
Key: m.Key,
}

Expand Down Expand Up @@ -182,13 +178,13 @@ func (rl *ReadLoop) Process(ctx context.Context, messages []*isb.ReadMessage) {

// close any windows that need to be closed.
wm := processor.Watermark(m.Watermark)
closedWindows := rl.aw.RemoveWindow(time.Time(wm))
closedWindows := rl.windower.RemoveWindows(time.Time(wm))
rl.log.Debugw("closing windows", zap.Int("length", len(closedWindows)), zap.Time("watermark", time.Time(wm)))

for _, cw := range closedWindows {
partitions := cw.Partitions()
rl.closePartitions(partitions)
rl.log.Debugw("Closing Window", zap.Time("windowStart", cw.Start), zap.Time("windowEnd", cw.End))
rl.log.Debugw("Closing Window", zap.Time("windowStart", cw.StartTime()), zap.Time("windowEnd", cw.EndTime()))
}
}
}
Expand Down Expand Up @@ -233,24 +229,25 @@ func (rl *ReadLoop) ShutDown(ctx context.Context) {

// upsertWindowsAndKeys will create or assigns (if already present) a window to the message. It is an upsert operation
// because windows are created out of order, but they will be closed in-order.
func (rl *ReadLoop) upsertWindowsAndKeys(m *isb.ReadMessage) []*keyed.KeyedWindow {
func (rl *ReadLoop) upsertWindowsAndKeys(m *isb.ReadMessage) []window.AlignedWindow {
// drop the late messages
if m.IsLate {
rl.log.Warnw("Dropping the late message", zap.Time("eventTime", m.EventTime), zap.Time("watermark", m.Watermark))
return []*keyed.KeyedWindow{}
return []window.AlignedWindow{}
}

processingWindows := rl.windowingStrategy.AssignWindow(m.EventTime)
var kWindows []*keyed.KeyedWindow
processingWindows := rl.windower.AssignWindow(m.EventTime)
var kWindows []window.AlignedWindow
for _, win := range processingWindows {
kw := rl.aw.GetKeyedWindow(win)
if kw == nil {
kw = rl.aw.CreateKeyedWindow(win)
rl.log.Debugw("Creating new keyed window", zap.Any("key", kw.Keys), zap.Int64("startTime", kw.Start.UnixMilli()), zap.Int64("endTime", kw.End.UnixMilli()))
w := rl.windower.GetWindow(win)
if w == nil {
w = rl.windower.CreateWindow(win)
}
// track the key to window relationship
kw.AddKey(m.Key)
kWindows = append(kWindows, kw)
w.AddKey(m.Key)
rl.log.Debugw("Creating new keyed window", zap.Any("key", w.Keys()), zap.Int64("startTime", w.StartTime().UnixMilli()), zap.Int64("endTime", w.EndTime().UnixMilli()))

kWindows = append(kWindows, w)
}
return kWindows
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/window"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/forward"
"github.com/numaproj/numaflow/pkg/pbq"
Expand All @@ -33,7 +35,6 @@ import (
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/window"
)

// DataForward reads data from isb and forwards them to readloop
Expand Down
57 changes: 42 additions & 15 deletions pkg/window/keyed/keyed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,56 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package keyed ....
// TODO: document
// Package keyed implements KeyedWindows. A keyed window associates key(s) with a window.
// A key uniquely identifies a partitioned set of events in a given time window.
package keyed

import (
"sync"
"time"

"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/window"
)

// KeyedWindow maintains association between keys and a window.
// In a keyed stream, we need to close all the partitions when the watermark is past the window.
type KeyedWindow struct {
*window.IntervalWindow
// TODO: can this be map[string]struct{} ?
Keys map[string]string
// Start start time of the window
Start time.Time
// End end time of the window
End time.Time
// keys map of keys
keys map[string]struct{}
lock sync.RWMutex
}

// NewKeyedWindow creates a new keyed window
func NewKeyedWindow(window *window.IntervalWindow) *KeyedWindow {
func NewKeyedWindow(start time.Time, end time.Time) *KeyedWindow {
kw := &KeyedWindow{
IntervalWindow: window,
Keys: make(map[string]string),
lock: sync.RWMutex{},
Start: start,
End: end,
keys: make(map[string]struct{}),
lock: sync.RWMutex{},
}
return kw
}

// StartTime returns start of the window.
func (kw *KeyedWindow) StartTime() time.Time {
return kw.Start
}

// EndTime returns end of the window.
func (kw *KeyedWindow) EndTime() time.Time {
return kw.End
}

// AddKey adds a key to an existing window
func (kw *KeyedWindow) AddKey(key string) {
kw.lock.Lock()
defer kw.lock.Unlock()
if _, ok := kw.Keys[key]; !ok {
kw.Keys[key] = key
if _, ok := kw.keys[key]; !ok {
kw.keys[key] = struct{}{}
}
}

Expand All @@ -58,12 +72,25 @@ func (kw *KeyedWindow) Partitions() []partition.ID {
kw.lock.RLock()
defer kw.lock.RUnlock()

partitions := make([]partition.ID, len(kw.Keys))
partitions := make([]partition.ID, len(kw.keys))
idx := 0
for k := range kw.Keys {
partitions[idx] = partition.ID{Start: kw.IntervalWindow.Start, End: kw.IntervalWindow.End, Key: k}
for k := range kw.keys {
partitions[idx] = partition.ID{Start: kw.StartTime(), End: kw.EndTime(), Key: k}
idx++
}

return partitions
}

func (kw *KeyedWindow) Keys() []string {
kw.lock.RLock()
defer kw.lock.RUnlock()

keys := make([]string, len(kw.keys))
idx := 0
for k := range kw.keys {
keys[idx] = k
}

return keys
}
35 changes: 13 additions & 22 deletions pkg/window/keyed/keyed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,60 +23,51 @@ import (

"github.com/numaproj/numaflow/pkg/pbq/partition"

"github.com/numaproj/numaflow/pkg/window"
"github.com/stretchr/testify/assert"
)

func TestKeyedWindow_AddKey(t *testing.T) {
iw := &window.IntervalWindow{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
}
kw := NewKeyedWindow(iw)
kw := NewKeyedWindow(time.Unix(60, 0), time.Unix(120, 0))
tests := []struct {
name string
given *KeyedWindow
input string
expectedKeys map[string]string
expectedKeys map[string]struct{}
}{
{
name: "no_keys",
given: &KeyedWindow{},
input: "key1",
expectedKeys: map[string]string{"key1": "key1"},
expectedKeys: map[string]struct{}{"key1": {}},
},
{
name: "with_some_existing_keys",
given: &KeyedWindow{
Keys: map[string]string{"key2": "key2", "key3": "key3"},
keys: map[string]struct{}{"key2": {}, "key3": {}},
},
input: "key4",
expectedKeys: map[string]string{"key2": "key2", "key3": "key3", "key4": "key4"},
expectedKeys: map[string]struct{}{"key2": {}, "key3": {}, "key4": {}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kw = NewKeyedWindow(iw)
for k := range tt.given.Keys {
kw = NewKeyedWindow(time.Unix(60, 0), time.Unix(120, 0))
for k := range tt.given.keys {
kw.AddKey(k)
}
kw.AddKey(tt.input)
assert.Equal(t, len(tt.expectedKeys), len(kw.Keys))
assert.Equal(t, len(tt.expectedKeys), len(kw.keys))
for k := range tt.expectedKeys {
_, ok := kw.Keys[k]
_, ok := kw.keys[k]
assert.True(t, ok)
}
})
}
}

func TestKeyedWindow_Partitions(t *testing.T) {
iw := &window.IntervalWindow{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
}
kw := NewKeyedWindow(iw)
kw := NewKeyedWindow(time.Unix(60, 0), time.Unix(120, 0))
tests := []struct {
name string
given *KeyedWindow
Expand All @@ -91,7 +82,7 @@ func TestKeyedWindow_Partitions(t *testing.T) {
{
name: "with_some_existing_keys",
given: &KeyedWindow{
Keys: map[string]string{"key2": "key2", "key3": "key3", "key4": "key4"},
keys: map[string]struct{}{"key2": {}, "key3": {}, "key4": {}},
},
expected: []partition.ID{
{
Expand All @@ -115,9 +106,9 @@ func TestKeyedWindow_Partitions(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kw.Keys = tt.given.Keys
kw.keys = tt.given.keys
ret := kw.Partitions()
// the kw.Keys is a map so the order of the output is random
// the kw.keys is a map so the order of the output is random
// use sort to sort the ret array by key
sort.Slice(ret, func(i int, j int) bool {
return ret[i].Key < ret[j].Key
Expand Down
Loading

0 comments on commit 6f5e83a

Please sign in to comment.