Skip to content

Commit

Permalink
feat: watermark - remove non-share OT bucket option (#302)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored Nov 2, 2022
1 parent cc44875 commit 0548d4d
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 415 deletions.
10 changes: 5 additions & 5 deletions pkg/reduce/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ func TestDataForward_StartWithInMemoryWMStore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

//create from buffers for tests
// create from buffers for tests
fromBuffer1 := simplebuffer.NewInMemoryBuffer("from1", fromBufferSize)
fromBuffer2 := simplebuffer.NewInMemoryBuffer("from2", fromBufferSize)
fromBuffer3 := simplebuffer.NewInMemoryBuffer("from3", fromBufferSize)

//create to buffers for tests
// create to buffers for tests
buffer1 := simplebuffer.NewInMemoryBuffer("to", toBufferSize)
buffer2 := simplebuffer.NewInMemoryBuffer("to", toBufferSize)
buffer3 := simplebuffer.NewInMemoryBuffer("to", toBufferSize)
Expand Down Expand Up @@ -263,12 +263,12 @@ func TestDataForward_StartWithInMemoryWMStore(t *testing.T) {
go writeMessages(ctx, 100, "test-2", fromBuffer2, p2["from2"], time.Minute*1)
go writeMessages(ctx, 1000, "test-3", fromBuffer3, p3["from3"], time.Minute*10)

//create window for tests
// create window for tests
window1 := fixed.NewFixed(2 * time.Second)
window2 := fixed.NewFixed(2 * time.Minute)
window3 := fixed.NewFixed(20 * time.Minute)

//create forwarder for tests
// create forwarder for tests
var reduceDataForwarder1, reduceDataForwarder2, reduceDataForwarder3 *DataForward
reduceDataForwarder1, err = NewDataForward(ctx, CounterReduceTest{}, fromBuffer1, toBuffer1, pbqManager1, CounterReduceTest{}, f1, p1, window1, WithReadBatchSize(10))
assert.NoError(t, err)
Expand Down Expand Up @@ -385,7 +385,7 @@ func fetcherAndPublisher(ctx context.Context, toBuffers map[string]isb.BufferWri
hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh)
otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh)

var pm = fetch.NewProcessorManager(ctx, wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), fetch.WithPodHeartbeatRate(1), fetch.WithRefreshingProcessorsRate(1), fetch.WithSeparateOTBuckets(false))
var pm = fetch.NewProcessorManager(ctx, wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), fetch.WithPodHeartbeatRate(1), fetch.WithRefreshingProcessorsRate(1))
var f = fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), pm)
return f, publishers
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/watermark/fetch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ type processorManagerOptions struct {
podHeartbeatRate int64
// refreshingProcessorsRate uses second as time unit
refreshingProcessorsRate int64
separateOTBucket bool
}

// ProcessorManagerOption set options for FromVertex.
Expand All @@ -24,10 +23,3 @@ func WithRefreshingProcessorsRate(rate int64) ProcessorManagerOption {
opts.refreshingProcessorsRate = rate
}
}

// WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline.
func WithSeparateOTBuckets(separate bool) ProcessorManagerOption {
return func(opts *processorManagerOptions) {
opts.separateOTBucket = separate
}
}
3 changes: 1 addition & 2 deletions pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm
opts := &processorManagerOptions{
podHeartbeatRate: 5,
refreshingProcessorsRate: 5,
separateOTBucket: false,
}
for _, opt := range inputOpts {
opt(opts)
Expand Down Expand Up @@ -166,7 +165,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() {
// A fromProcessor needs to be added to v.processors
// The fromProcessor may have been deleted
// TODO: make capacity configurable
var entity = processor.NewProcessorEntity(value.Key(), processor.WithSeparateOTBuckets(v.opts.separateOTBucket))
var entity = processor.NewProcessorEntity(value.Key())
var fromProcessor = NewProcessorToFetch(v.ctx, entity, 10, v.otWatcher)
v.addProcessor(value.Key(), fromProcessor)
v.log.Infow("v.AddProcessor successfully added a new fromProcessor", zap.String("fromProcessor", value.Key()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/fetch/processor_manager_inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.NoError(t, err)
otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh)
assert.NoError(t, err)
var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(false))
var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1))
var testBuffer = NewEdgeFetcher(ctx, "testBuffer", testVertex).(*edgeFetcher)

// start p1 heartbeat for 3 loops
Expand Down
226 changes: 1 addition & 225 deletions pkg/watermark/fetch/processor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
assert.NoError(t, err)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(false))
var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1))
var testBuffer = NewEdgeFetcher(ctx, "testBuffer", testVertex).(*edgeFetcher)

wg.Add(1)
Expand Down Expand Up @@ -271,227 +271,3 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
wg.Wait()
cancel()
}

func TestFetcherWithSeparateOTBucket(t *testing.T) {
// FIXME: Check for seperate buckets implementation for single watcher or multiple watcher
// Maybe we should not support seperate OT because controller does not support it
t.Skip()
// uncomment to debug
// os.Setenv("NUMAFLOW_DEBUG", "true")

var ctx = context.Background()

// Connect to NATS
nc, err := jsclient.NewDefaultJetStreamClient(nats.DefaultURL).Connect(context.TODO())
assert.Nil(t, err)

// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
assert.Nil(t, err)

// create heartbeat bucket
var keyspace = "fetcherTest"

heartbeatBucket, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: keyspace + "_PROCESSORS",
Description: fmt.Sprintf("[%s] heartbeat bucket", keyspace),
MaxValueSize: 0,
History: 0,
TTL: 0,
MaxBytes: 0,
Storage: nats.MemoryStorage,
Replicas: 0,
Placement: nil,
})
defer func() { _ = js.DeleteKeyValue(keyspace + "_PROCESSORS") }()
assert.Nil(t, err)

var epoch int64 = 1651161600000
var testOffset int64 = 100
p1OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: keyspace + "_OT_" + "p1",
Description: "",
MaxValueSize: 0,
History: 0,
TTL: 0,
MaxBytes: 0,
Storage: nats.MemoryStorage,
Replicas: 0,
Placement: nil,
})
defer func() { _ = js.DeleteKeyValue(keyspace + "_OT_" + "p1") }()
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(testOffset))
_, err = p1OT.Put(fmt.Sprintf("%d", epoch), b)
assert.NoError(t, err)

epoch += 60000
binary.LittleEndian.PutUint64(b, uint64(testOffset+5))
p2OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: keyspace + "_OT_" + "p2",
Description: "",
MaxValueSize: 0,
History: 0,
TTL: 0,
MaxBytes: 0,
Storage: nats.MemoryStorage,
Replicas: 0,
Placement: nil,
})
defer func() { _ = js.DeleteKeyValue(keyspace + "_OT_" + "p2") }()
_, err = p2OT.Put(fmt.Sprintf("%d", epoch), b)
assert.NoError(t, err)

defaultJetStreamClient := jsclient.NewDefaultJetStreamClient(nats.DefaultURL)

hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(true))
var testBuffer = NewEdgeFetcher(ctx, "testBuffer", testVertex).(*edgeFetcher)

// var location, _ = time.LoadLocation("UTC")
go func() {
var err error
for i := 0; i < 3; i++ {
_, err = heartbeatBucket.Put("p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
err = heartbeatBucket.Delete("p1")
assert.NoError(t, err)
}()

go func() {
for i := 0; i < 100; i++ {
_, err := heartbeatBucket.Put("p2", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

allProcessors := testBuffer.processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())

// "p1" is deleted after 5 loops
for !allProcessors["p1"].IsDeleted() {
select {
case <-ctx.Done():
t.Fatalf("expected p1 to be deleted: %s", ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

allProcessors = testBuffer.processorManager.GetAllProcessors()
assert.Equal(t, 2, len(allProcessors))
assert.True(t, allProcessors["p1"].IsDeleted())
assert.True(t, allProcessors["p2"].IsActive())
_ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }))
allProcessors = testBuffer.processorManager.GetAllProcessors()
assert.Equal(t, 2, len(allProcessors))
assert.True(t, allProcessors["p1"].IsDeleted())
assert.True(t, allProcessors["p2"].IsActive())
// "p1" should be deleted after this GetWatermark offset=101
// because "p1" offsetTimeline's head offset=100, which is < inputOffset 103
_ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }))
allProcessors = testBuffer.processorManager.GetAllProcessors()
assert.Equal(t, 1, len(allProcessors))
assert.True(t, allProcessors["p2"].IsActive())

time.Sleep(time.Second)

// resume after one second
go func() {
var err error
for i := 0; i < 5; i++ {
_, err = heartbeatBucket.Put("p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
}()

// wait until p1 becomes active
allProcessors = testBuffer.processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())

// "p1" has been deleted from vertex.Processors
// so "p1" will be considered as a new processors and a new offsetTimeline watcher for "p1" will be created
_ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }))
newP1 := testBuffer.processorManager.GetProcessor("p1")
assert.NotNil(t, newP1)
assert.True(t, newP1.IsActive())
assert.NotNil(t, newP1.offsetTimeline)
// because the bucket hasn't been cleaned up, the new watcher will read all the history data to create this new offsetTimeline
assert.Equal(t, int64(100), newP1.offsetTimeline.GetHeadOffset())

// publish a new watermark 101
binary.LittleEndian.PutUint64(b, uint64(testOffset+1))
_, err = p1OT.Put(fmt.Sprintf("%d", epoch), b)
assert.NoError(t, err)

// "p1" becomes inactive after 5 loops
for !allProcessors["p1"].IsInactive() {
select {
case <-ctx.Done():
t.Fatalf("expected p1 to be inactive: %s", ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

time.Sleep(time.Second)

// resume after one second
go func() {
var err error
for i := 0; i < 5; i++ {
_, err = heartbeatBucket.Put("p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
}()

allProcessors = testBuffer.processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

// added 101 in the previous steps for newP1, so the head should be 101 after resume
assert.Equal(t, int64(101), newP1.offsetTimeline.GetHeadOffset())

}
2 changes: 1 addition & 1 deletion pkg/watermark/fetch/processor_to_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() {
p.log.Errorw("Unable to convert value.PartitionID() to int64", zap.String("received", value.Key()), zap.Error(err))
continue
}
// if skip is set to true, it means the key update we received is for a different processor (sharing of bucket)
// if skip is set to true, it means the key update we received is for a different processor
if skip {
continue
}
Expand Down
Loading

0 comments on commit 0548d4d

Please sign in to comment.