Skip to content

Commit

Permalink
fix: IdleWatermark unit test (#640)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored Mar 28, 2023
1 parent 924ad33 commit 0c79113
Showing 1 changed file with 7 additions and 16 deletions.
23 changes: 7 additions & 16 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,31 +247,17 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {
assert.True(t, to1.IsEmpty())

stopped := f.Start()
assert.True(t, fromStep.IsEmpty())
// first batch: read message size is 1 with one ctrl message
// the ctrl message should be acked
// no message published to the next vertex
// so the timeline should be empty
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for fromStep.IsEmpty() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
err = ctx.Err()
}
default:
time.Sleep(1 * time.Millisecond)
}
}
}()
_, errs := fromStep.Write(ctx, ctrlMessage)
assert.Equal(t, make([]error, 1), errs)
wg.Wait()
if err != nil {
t.Fatal("expected the buffer not to be empty", err)
}
// waiting for the ctrl message to be acked
for !fromStep.IsEmpty() {
select {
case <-ctx.Done():
Expand All @@ -282,11 +268,16 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
}
// it should not publish any wm because
// 1. readLength != 0
// 2. we only have one ctrl message
// 3. meaning dataMessage=0
otNil, _ := otStores["to1"].GetAllKeys(ctx)
assert.Nil(t, otNil)

// 2nd and 3rd batches: read message size is 0
// should send idle watermark
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Expand Down

0 comments on commit 0c79113

Please sign in to comment.