diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 204c09e1d3..f955dc524a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -496,6 +496,8 @@ void drainLoop() { if (isHolder) { ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o; if (producerIndex == consumerIndexHolder.index) { + w.onComplete(); + w = UnicastProcessor.create(bufferSize); window = w; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index fb4e219b5d..5edc60688e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -439,6 +439,8 @@ void drainLoop() { if (isHolder) { ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o; if (producerIndex == consumerIndexHolder.index) { + w.onComplete(); + w = UnicastSubject.create(bufferSize); window = w; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java index 270e4e13e6..3e3e1e3dc6 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -686,4 +686,22 @@ public Flowable apply(Flowable v) throws Exception { .awaitDone(1, TimeUnit.SECONDS) .assertResult(1, 2); } + + @Test + public void sizeTimeTimeout() { + TestScheduler scheduler = new TestScheduler(); + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 100) + .test() + .assertValueCount(1); + + scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS); + + ts.assertValueCount(2) + .assertNoErrors() + .assertNotComplete(); + + ts.values().get(0).test().assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java index f00a85f807..5a6b326951 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -585,4 +585,22 @@ public ObservableSource apply(Observable v) throws Exception { .awaitDone(1, TimeUnit.SECONDS) .assertResult(1, 2); } + + @Test + public void sizeTimeTimeout() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 100) + .test() + .assertValueCount(1); + + scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS); + + ts.assertValueCount(2) + .assertNoErrors() + .assertNotComplete(); + + ts.values().get(0).test().assertResult(); + } }