From 06cae331f6a9c093917ce3977c4ab5bab043969e Mon Sep 17 00:00:00 2001 From: David Weitzman Date: Mon, 4 Feb 2019 15:20:34 -0800 Subject: [PATCH] Eliminate flakiness for sync2.Batcher's unit test"> Signed-off-by: David Weitzman --- go/sync2/batcher.go | 16 +++++- go/sync2/batcher_flaky_test.go | 69 ---------------------- go/sync2/batcher_test.go | 101 +++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 70 deletions(-) delete mode 100644 go/sync2/batcher_flaky_test.go create mode 100644 go/sync2/batcher_test.go diff --git a/go/sync2/batcher.go b/go/sync2/batcher.go index 161531d927e..8e9e14434df 100644 --- a/go/sync2/batcher.go +++ b/go/sync2/batcher.go @@ -32,6 +32,7 @@ type Batcher struct { queue chan int waiters AtomicInt32 nextID AtomicInt32 + after func(time.Duration) <-chan time.Time } // NewBatcher returns a new Batcher @@ -41,6 +42,19 @@ func NewBatcher(interval time.Duration) *Batcher { queue: make(chan int), waiters: NewAtomicInt32(0), nextID: NewAtomicInt32(0), + after: time.After, + } +} + +// newBatcherForTest returns a Batcher for testing where time.After can +// be replaced by a fake alternative. +func newBatcherForTest(interval time.Duration, after func(time.Duration) <-chan time.Time) *Batcher { + return &Batcher{ + interval: interval, + queue: make(chan int), + waiters: NewAtomicInt32(0), + nextID: NewAtomicInt32(0), + after: after, } } @@ -56,7 +70,7 @@ func (b *Batcher) Wait() int { // newBatch starts a new batch func (b *Batcher) newBatch() { go func() { - time.Sleep(b.interval) + <-b.after(b.interval) id := b.nextID.Add(1) diff --git a/go/sync2/batcher_flaky_test.go b/go/sync2/batcher_flaky_test.go deleted file mode 100644 index 5fe89989c54..00000000000 --- a/go/sync2/batcher_flaky_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreedto in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sync2 - -import ( - "testing" - "time" -) - -func expectBatch(testcase string, b *Batcher, want int, t *testing.T) { - id := b.Wait() - if id != want { - t.Errorf("%s: got %d, want %d", testcase, id, want) - } -} - -func TestBatcher(t *testing.T) { - interval := time.Duration(50 * time.Millisecond) - b := NewBatcher(interval) - - // test single waiter - go expectBatch("single waiter", b, 1, t) - time.Sleep(interval * 2) - - // multiple waiters all at once - go expectBatch("concurrent waiter", b, 2, t) - go expectBatch("concurrent waiter", b, 2, t) - go expectBatch("concurrent waiter", b, 2, t) - time.Sleep(interval * 2) - - // stagger the waiters out in time but cross two intervals - go expectBatch("staggered waiter", b, 3, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter", b, 3, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter", b, 3, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter", b, 3, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter", b, 3, t) - time.Sleep(interval / 5) - - go expectBatch("staggered waiter 2", b, 4, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter 2", b, 4, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter 2", b, 4, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter 2", b, 4, t) - time.Sleep(interval / 5) - go expectBatch("staggered waiter 2", b, 4, t) - time.Sleep(interval / 5) - - time.Sleep(interval * 2) -} diff --git a/go/sync2/batcher_test.go b/go/sync2/batcher_test.go new file mode 100644 index 00000000000..6e90c338507 --- /dev/null +++ b/go/sync2/batcher_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync2 + +import ( + "testing" + "time" +) + +// makeAfterFnWithLatch returns a fake alternative to time.After that blocks until +// the release function is called. The fake doesn't support having multiple concurrent +// calls to the After function, which is ok because Batcher should never do that. +func makeAfterFnWithLatch(t *testing.T) (func(time.Duration) <-chan time.Time, func()) { + latch := make(chan time.Time, 1) + afterFn := func(d time.Duration) <-chan time.Time { + return latch + } + + releaseFn := func() { + select { + case latch <- time.Now(): + default: + t.Errorf("Previous batch still hasn't been released") + } + } + return afterFn, releaseFn +} + +func TestBatcher(t *testing.T) { + interval := time.Duration(50 * time.Millisecond) + + afterFn, releaseBatch := makeAfterFnWithLatch(t) + b := newBatcherForTest(interval, afterFn) + + waitersFinished := NewAtomicInt32(0) + + startWaiter := func(testcase string, want int) { + go func() { + id := b.Wait() + if id != want { + t.Errorf("%s: got %d, want %d", testcase, id, want) + } + waitersFinished.Add(1) + }() + } + + awaitVal := func(name string, val *AtomicInt32, expected int32) { + for count := 0; val.Get() != expected; count++ { + time.Sleep(50 * time.Millisecond) + if count > 5 { + t.Errorf("Timed out waiting for %s to be %v", name, expected) + return + } + } + } + + awaitBatch := func(name string, n int32) { + // Wait for all the waiters to register + awaitVal("Batcher.waiters for "+name, &b.waiters, n) + // Release the batch and wait for the batcher to catch up. + if waitersFinished.Get() != 0 { + t.Errorf("Waiters finished before being released") + } + releaseBatch() + awaitVal("Batcher.waiters for "+name, &b.waiters, 0) + // Make sure the waiters actually run so they can verify their batch number. + awaitVal("waitersFinshed for "+name, &waitersFinished, n) + waitersFinished.Set(0) + } + + // test single waiter + startWaiter("single waiter", 1) + awaitBatch("single waiter", 1) + + // multiple waiters all at once + startWaiter("concurrent waiter", 2) + startWaiter("concurrent waiter", 2) + startWaiter("concurrent waiter", 2) + awaitBatch("concurrent waiter", 3) + + startWaiter("more waiters", 3) + startWaiter("more waiters", 3) + startWaiter("more waiters", 3) + startWaiter("more waiters", 3) + startWaiter("more waiters", 3) + awaitBatch("more waiters", 5) +}