From e6b45db0b8a24184b3143db74afee9fbb6b74126 Mon Sep 17 00:00:00 2001 From: Juanlu Yu <19543684+chromevoid@users.noreply.github.com> Date: Mon, 27 Mar 2023 18:16:58 -0700 Subject: [PATCH] Revert "fix: IdleWatermark unit test (#640)" This reverts commit 0c79113c53619e30bc51b7687f9cdc313f752679. --- pkg/forward/forward_test.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index 75faed4b43..bad829c86f 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -247,17 +247,31 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) { assert.True(t, to1.IsEmpty()) stopped := f.Start() - assert.True(t, fromStep.IsEmpty()) // first batch: read message size is 1 with one ctrl message // the ctrl message should be acked // no message published to the next vertex // so the timeline should be empty + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for fromStep.IsEmpty() { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + err = ctx.Err() + } + default: + time.Sleep(1 * time.Millisecond) + } + } + }() _, errs := fromStep.Write(ctx, ctrlMessage) assert.Equal(t, make([]error, 1), errs) + wg.Wait() if err != nil { t.Fatal("expected the buffer not to be empty", err) } - // waiting for the ctrl message to be acked for !fromStep.IsEmpty() { select { case <-ctx.Done(): @@ -268,16 +282,11 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) { time.Sleep(1 * time.Millisecond) } } - // it should not publish any wm because - // 1. readLength != 0 - // 2. we only have one ctrl message - // 3. meaning dataMessage=0 otNil, _ := otStores["to1"].GetAllKeys(ctx) assert.Nil(t, otNil) // 2nd and 3rd batches: read message size is 0 // should send idle watermark - var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done()