Skip to content

Commit

Permalink
2.x: Make CompositeExcpetion thread-safe like 1.x and also fix some i…
Browse files Browse the repository at this point in the history
…ssues. (#4619)

Right now CompositeExcpetion has several issues:

- `CompositeException(Throwable... exceptions)` doesn't deduplicate exceptions and flatten CompositeExceptions like `CompositeException(Iterable<? extends Throwable> errors)`
- If using `CompositeException(Iterable<? extends Throwable> errors)` to create CompositeException, `suppress` cannot be used.
- `suppress` doesn't update `cause`.
- `suppress` doesn't deduplicate exceptions and flatten CompositeExceptions.
- `suppress` and `Throwable.addSuppressed` are pretty confusing for Java 7+ users. Without looking at the implementation, it's hard to figure out the differences.

This PR made the following changes:

- Remove `CompositeException.suppress` so that it's easy to make CompositeException thread-safe.
  - This may cause some performance lost in some path rarely happening, e.g., an excpetion is thrown from `onError`, but that's not a big deal.
  - Since `suppress` is removed, it doesn't make sense to create an empty CompositeException, so `isEmpty` is removed and defense codes are added.
- Defense codes for bad exceptions.
- Deduplicate excepctions and flatten CompositeExceptions for `CompositeException(Throwable... exceptions)`.
  • Loading branch information
zsxwing authored and akarnokd committed Sep 28, 2016
1 parent 534fc67 commit c3a1d91
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 134 deletions.
67 changes: 18 additions & 49 deletions src/main/java/io/reactivex/exceptions/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,24 @@ public final class CompositeException extends RuntimeException {
private final String message;
private Throwable cause;

/**
* Constructs an empty CompositeException.
*/
public CompositeException() {
this.exceptions = new ArrayList<Throwable>();
this.message = null;
}

/**
* Constructs a CompositeException with the given array of Throwables as the
* list of suppressed exceptions.
* @param exceptions the Throwables to have as initially suppressed exceptions
*
* @throws IllegalArgumentException if <code>exceptions</code> is empty.
*/
public CompositeException(Throwable... exceptions) {
this.exceptions = new ArrayList<Throwable>();
if (exceptions == null) {
this.message = "1 exception occurred. ";
this.exceptions.add(new NullPointerException("exceptions is null"));
} else {
this.message = exceptions.length + " exceptions occurred. ";
for (Throwable t : exceptions) {
this.exceptions.add(t != null ? t : new NullPointerException("One of the exceptions is null"));
}
}
this(exceptions == null ?
Arrays.asList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions));
}


/**
* Constructs a CompositeException with the given array of Throwables as the
* list of suppressed exceptions.
* @param errors the Throwables to have as initially suppressed exceptions
*
* @throws IllegalArgumentException if <code>errors</code> is empty.
*/
public CompositeException(Iterable<? extends Throwable> errors) {
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
Expand All @@ -83,13 +70,15 @@ public CompositeException(Iterable<? extends Throwable> errors) {
if (ex != null) {
deDupedExceptions.add(ex);
} else {
deDupedExceptions.add(new NullPointerException());
deDupedExceptions.add(new NullPointerException("Throwable was null!"));
}
}
} else {
deDupedExceptions.add(new NullPointerException());
deDupedExceptions.add(new NullPointerException("errors was null"));
}
if (deDupedExceptions.isEmpty()) {
throw new IllegalArgumentException("errors is empty");
}

localExceptions.addAll(deDupedExceptions);
this.exceptions = Collections.unmodifiableList(localExceptions);
this.message = exceptions.size() + " exceptions occurred. ";
Expand All @@ -109,17 +98,6 @@ public String getMessage() {
return message;
}

/**
* Adds a suppressed exception to this composite.
* <p>The method is named this way to avoid conflicts with Java 7 environments
* and its addSuppressed() method.
* @param e the exception to suppress, nulls are converted to NullPointerExceptions
*/
public void suppress(Throwable e) {
exceptions.add(e != null ? e : new NullPointerException("null exception"));
}


@Override
public synchronized Throwable getCause() { // NOPMD
if (cause == null) {
Expand Down Expand Up @@ -266,15 +244,16 @@ public String getMessage() {
private List<Throwable> getListOfCauses(Throwable ex) {
List<Throwable> list = new ArrayList<Throwable>();
Throwable root = ex.getCause();
if (root == null) {
if (root == null || root == ex) {
return list;
} else {
while (true) {
list.add(root);
if (root.getCause() == null) {
Throwable cause = root.getCause();
if (cause == null || cause == root) {
return list;
} else {
root = root.getCause();
root = cause;
}
}
}
Expand All @@ -288,16 +267,6 @@ public int size() {
return exceptions.size();
}

/**
* Returns true if this CompositeException doesn't have a cause or
* any suppressed exceptions.
* @return true if this CompositeException doesn't have a cause or
* any suppressed exceptions.
*/
public boolean isEmpty() {
return exceptions.isEmpty();
}

/**
* Returns the root cause of {@code e}. If {@code e.getCause()} returns {@code null} or {@code e}, just return {@code e} itself.
*
Expand All @@ -306,15 +275,15 @@ public boolean isEmpty() {
*/
private Throwable getRootCause(Throwable e) {
Throwable root = e.getCause();
if (root == null /* || cause == root */) { // case might not be possible
if (root == null || cause == root) {
return e;
}
while (true) {
Throwable cause = root.getCause();
if (cause == null /* || cause == root */) { // case might not be possible
if (cause == null || cause == root) {
return root;
}
root = root.getCause();
root = cause;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package io.reactivex.internal.operators.flowable;

import io.reactivex.plugins.RxJavaPlugins;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

Expand Down Expand Up @@ -550,39 +553,42 @@ boolean checkTerminate() {
}

void reportError(SimpleQueue<Throwable> q) {
CompositeException composite = null;
List<Throwable> composite = null;
Throwable ex = null;

Throwable t;
int count = 0;
for (;;) {
Throwable t;
try {
t = q.poll();
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
if (composite == null) {
composite = new CompositeException(ex);
if (ex == null) {
ex = exc;
} else {
if (composite == null) {
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.add(exc);
}
composite.suppress(exc);
break;
}

if (t == null) {
break;
}
if (count == 0) {
if (ex == null) {
ex = t;
} else {
if (composite == null) {
composite = new CompositeException(ex);
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.suppress(t);
composite.add(t);
}

count++;
}
if (composite != null) {
actual.onError(composite);
actual.onError(new CompositeException(composite));
} else {
actual.onError(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,8 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
void onError(Throwable e) {
for (;;) {
Throwable curr = error.get();
if (curr instanceof CompositeException) {
CompositeException ce = new CompositeException((CompositeException)curr);
ce.suppress(e);
if (curr != null) {
CompositeException ce = new CompositeException(curr, e);
e = ce;
}
Throwable next = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,39 +486,42 @@ boolean checkTerminate() {
}

void reportError(SimpleQueue<Throwable> q) {
CompositeException composite = null;
List<Throwable> composite = null;
Throwable ex = null;

Throwable t;
int count = 0;
for (;;) {
Throwable t;
try {
t = q.poll();
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
if (composite == null) {
composite = new CompositeException(exc);
if (ex == null) {
ex = exc;
} else {
if (composite == null) {
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.add(exc);
}
composite.suppress(exc);
break;
}

if (t == null) {
break;
}
if (count == 0) {
if (ex == null) {
ex = t;
} else {
if (composite == null) {
composite = new CompositeException(ex);
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.suppress(t);
composite.add(t);
}

count++;
}
if (composite != null) {
actual.onError(composite);
actual.onError(new CompositeException(composite));
} else {
actual.onError(ex);
}
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/reactivex/observers/BaseTestConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,8 @@ protected final AssertionError fail(String message) {
;

AssertionError ae = new AssertionError(b.toString());
CompositeException ce = new CompositeException();
for (Throwable e : errors) {
ce.suppress(e);
}
if (!ce.isEmpty()) {
if (!errors.isEmpty()) {
CompositeException ce = new CompositeException(errors);
ae.initCause(ce);
}
return ae;
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/io/reactivex/observers/SafeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,22 @@ public void onError(Throwable t) {
done = true;

if (s == null) {
CompositeException t2 = new CompositeException(t, new NullPointerException("Subscription not set!"));
Throwable npe = new NullPointerException("Subscription not set!");

try {
actual.onSubscribe(EmptyDisposable.INSTANCE);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because the actual's state may be corrupt at this point
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
return;
}
try {
actual.onError(t2);
actual.onError(new CompositeException(t, npe));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if onError failed, all that's left is to report the error to plugins
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
}
return;
}
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/io/reactivex/subscribers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,26 +130,22 @@ public void onError(Throwable t) {
done = true;

if (s == null) {
CompositeException t2 = new CompositeException(t, new NullPointerException("Subscription not set!"));
Throwable npe = new NullPointerException("Subscription not set!");

try {
actual.onSubscribe(EmptySubscription.INSTANCE);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because the actual's state may be corrupt at this point
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
return;
}
try {
actual.onError(t2);
actual.onError(new CompositeException(t, npe));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if onError failed, all that's left is to report the error to plugins
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
}
return;
}
Expand Down
Loading

0 comments on commit c3a1d91

Please sign in to comment.