Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed May 10, 2018
2 parents 0ecd7d3 + 97f5c10 commit 4de5e2d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ void terminated(RefConnection rc) {
synchronized (this) {
if (!rc.terminated) {
rc.terminated = true;
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
connection = null;
}
}
Expand Down Expand Up @@ -200,18 +203,18 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
actual.onError(t);
if (compareAndSet(false, true)) {
parent.terminated(connection);
}
actual.onError(t);
}

@Override
public void onComplete() {
actual.onComplete();
if (compareAndSet(false, true)) {
parent.terminated(connection);
}
actual.onComplete();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package hu.akarnokd.rxjava2.operators;

import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.lang.management.ManagementFactory;
import java.util.concurrent.*;

import org.junit.*;
import org.reactivestreams.Subscription;

import hu.akarnokd.rxjava2.test.TestHelper;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -205,4 +209,67 @@ public void run() {
.assertResult(1, 2, 3, 4, 5);
}
}

Flowable<Object> source;

@Test
public void replayNoLeak() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
})
.replay(1)
.compose(FlowableTransformers.<Object>refCount(1));

source.subscribe();

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void replayNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
}).concatWith(Flowable.never())
.replay(1)
.compose(FlowableTransformers.<Object>refCount(1));

Disposable s1 = source.subscribe();
Disposable s2 = source.subscribe();

s1.dispose();
s2.dispose();

s1 = null;
s2 = null;

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}
}

0 comments on commit 4de5e2d

Please sign in to comment.