From 0fe6e55489aea9fd58ed1760b8fed206a8f842ca Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 23 Mar 2018 11:31:47 +0100 Subject: [PATCH] 2.x: Fix concatMapSingle & concatMapMaybe dispose-cleanup crash (#5928) --- .../mixed/FlowableConcatMapMaybe.java | 2 +- .../mixed/FlowableConcatMapSingle.java | 2 +- .../mixed/ObservableConcatMapMaybe.java | 2 +- .../mixed/ObservableConcatMapSingle.java | 2 +- .../mixed/FlowableConcatMapMaybeTest.java | 26 +++++++++++++++++++ .../mixed/FlowableConcatMapSingleTest.java | 26 +++++++++++++++++++ .../mixed/ObservableConcatMapMaybeTest.java | 26 +++++++++++++++++++ .../mixed/ObservableConcatMapSingleTest.java | 26 +++++++++++++++++++ 8 files changed, 108 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java index 8910e0c10a..0b7eb3bd7d 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java @@ -170,7 +170,7 @@ public void cancel() { cancelled = true; upstream.cancel(); inner.dispose(); - if (getAndIncrement() != 0) { + if (getAndIncrement() == 0) { queue.clear(); item = null; } diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java index 6d0548a733..3164d16a4b 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java @@ -170,7 +170,7 @@ public void cancel() { cancelled = true; upstream.cancel(); inner.dispose(); - if (getAndIncrement() != 0) { + if (getAndIncrement() == 0) { queue.clear(); item = null; } diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java index 24e0b45027..8331c38f23 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java @@ -148,7 +148,7 @@ public void dispose() { cancelled = true; upstream.dispose(); inner.dispose(); - if (getAndIncrement() != 0) { + if (getAndIncrement() == 0) { queue.clear(); item = null; } diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java index 96a3443ad9..6f3f5d9333 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java @@ -148,7 +148,7 @@ public void dispose() { cancelled = true; upstream.dispose(); inner.dispose(); - if (getAndIncrement() != 0) { + if (getAndIncrement() == 0) { queue.clear(); item = null; } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java index 6c393acebb..334b2e4d69 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java @@ -27,7 +27,9 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.mixed.FlowableConcatMapMaybe.ConcatMapMaybeSubscriber; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.ErrorMode; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; @@ -368,4 +370,28 @@ public MaybeSource apply(Integer v) assertFalse(pp.hasSubscribers()); } + + @Test(timeout = 10000) + public void cancelNoConcurrentClean() { + TestSubscriber ts = new TestSubscriber(); + ConcatMapMaybeSubscriber operator = + new ConcatMapMaybeSubscriber( + ts, Functions.justFunction(Maybe.never()), 16, ErrorMode.IMMEDIATE); + + operator.onSubscribe(new BooleanSubscription()); + + operator.queue.offer(1); + + operator.getAndIncrement(); + + ts.cancel(); + + assertFalse(operator.queue.isEmpty()); + + operator.addAndGet(-2); + + operator.cancel(); + + assertTrue(operator.queue.isEmpty()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java index 92779f725f..c572e9ba30 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java @@ -26,7 +26,9 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.ErrorMode; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.subjects.SingleSubject; @@ -283,4 +285,28 @@ public SingleSource apply(Integer v) assertFalse(pp.hasSubscribers()); } + + @Test(timeout = 10000) + public void cancelNoConcurrentClean() { + TestSubscriber ts = new TestSubscriber(); + ConcatMapSingleSubscriber operator = + new ConcatMapSingleSubscriber( + ts, Functions.justFunction(Single.never()), 16, ErrorMode.IMMEDIATE); + + operator.onSubscribe(new BooleanSubscription()); + + operator.queue.offer(1); + + operator.getAndIncrement(); + + ts.cancel(); + + assertFalse(operator.queue.isEmpty()); + + operator.addAndGet(-2); + + operator.cancel(); + + assertTrue(operator.queue.isEmpty()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java index c22c38db64..f374ea23a2 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java @@ -26,6 +26,8 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.mixed.ObservableConcatMapMaybe.ConcatMapMaybeMainObserver; +import io.reactivex.internal.util.ErrorMode; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; @@ -373,4 +375,28 @@ public void scalarEmptySource() { assertFalse(ms.hasObservers()); } + + @Test(timeout = 10000) + public void cancelNoConcurrentClean() { + TestObserver to = new TestObserver(); + ConcatMapMaybeMainObserver operator = + new ConcatMapMaybeMainObserver( + to, Functions.justFunction(Maybe.never()), 16, ErrorMode.IMMEDIATE); + + operator.onSubscribe(Disposables.empty()); + + operator.queue.offer(1); + + operator.getAndIncrement(); + + to.dispose(); + + assertFalse(operator.queue.isEmpty()); + + operator.addAndGet(-2); + + operator.dispose(); + + assertTrue(operator.queue.isEmpty()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java index 420cef2171..be216649bb 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java @@ -25,6 +25,8 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.mixed.ObservableConcatMapSingle.ConcatMapSingleMainObserver; +import io.reactivex.internal.util.ErrorMode; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.*; @@ -310,4 +312,28 @@ public void scalarEmptySource() { assertFalse(ss.hasObservers()); } + + @Test(timeout = 10000) + public void cancelNoConcurrentClean() { + TestObserver to = new TestObserver(); + ConcatMapSingleMainObserver operator = + new ConcatMapSingleMainObserver( + to, Functions.justFunction(Single.never()), 16, ErrorMode.IMMEDIATE); + + operator.onSubscribe(Disposables.empty()); + + operator.queue.offer(1); + + operator.getAndIncrement(); + + to.cancel(); + + assertFalse(operator.queue.isEmpty()); + + operator.addAndGet(-2); + + operator.dispose(); + + assertTrue(operator.queue.isEmpty()); + } }