From 0c79113c53619e30bc51b7687f9cdc313f752679 Mon Sep 17 00:00:00 2001 From: Juanlu Yu <19543684+chromevoid@users.noreply.github.com> Date: Mon, 27 Mar 2023 18:14:54 -0700 Subject: [PATCH] fix: IdleWatermark unit test (#640) Signed-off-by: jyu6 --- pkg/forward/forward_test.go | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index bad829c86f..75faed4b43 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -247,31 +247,17 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) { assert.True(t, to1.IsEmpty()) stopped := f.Start() + assert.True(t, fromStep.IsEmpty()) // first batch: read message size is 1 with one ctrl message // the ctrl message should be acked // no message published to the next vertex // so the timeline should be empty - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for fromStep.IsEmpty() { - select { - case <-ctx.Done(): - if ctx.Err() == context.DeadlineExceeded { - err = ctx.Err() - } - default: - time.Sleep(1 * time.Millisecond) - } - } - }() _, errs := fromStep.Write(ctx, ctrlMessage) assert.Equal(t, make([]error, 1), errs) - wg.Wait() if err != nil { t.Fatal("expected the buffer not to be empty", err) } + // waiting for the ctrl message to be acked for !fromStep.IsEmpty() { select { case <-ctx.Done(): @@ -282,11 +268,16 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) { time.Sleep(1 * time.Millisecond) } } + // it should not publish any wm because + // 1. readLength != 0 + // 2. we only have one ctrl message + // 3. meaning dataMessage=0 otNil, _ := otStores["to1"].GetAllKeys(ctx) assert.Nil(t, otNil) // 2nd and 3rd batches: read message size is 0 // should send idle watermark + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done()