Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

fix the concurrent problem of handlers in ResponsePromise #188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 65 additions & 13 deletions src/main/java/mousio/client/promises/ResponsePromise.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* A Response promise
Expand All @@ -44,6 +46,7 @@ public class ResponsePromise<T> {
private List<IsSimplePromiseResponseHandler<T>> handlers;
private final GenericFutureListener<Promise<T>> promiseHandler;
private final ConnectionFailHandler connectionFailHandler;
private final Lock lock;

/**
* Constructor
Expand All @@ -56,6 +59,8 @@ public ResponsePromise(RetryPolicy retryPolicy, ConnectionState connectionState,
this.connectionState = connectionState;
this.retryHandler = retryHandler;
this.retryPolicy = retryPolicy;
// add a lock to deal with concurrent operations for handlers
this.lock = new ReentrantLock();

promiseHandler = new GenericFutureListener<Promise<T>>() {
@Override
Expand Down Expand Up @@ -95,13 +100,31 @@ public void attachNettyPromise(Promise<T> promise) {
* @param listener to add
*/
public void addListener(IsSimplePromiseResponseHandler<T> listener) {
if (handlers == null) {
handlers = new LinkedList<>();

if (listener == null) {
return;
}

handlers.add(listener);
boolean shouldInvoke = false;

try {
lock.lock();

if (handlers == null) {
handlers = new LinkedList<>();
}

handlers.add(listener);

if (response != null || exception != null) {
shouldInvoke = true;
}

} finally {
lock.unlock();
}

if (response != null || exception != null) {
if (shouldInvoke) {
listener.onResponse(this);
}
}
Expand All @@ -112,8 +135,15 @@ public void addListener(IsSimplePromiseResponseHandler<T> listener) {
* @param listener to remove
*/
public void removeListener(IsSimplePromiseResponseHandler<T> listener) {
if (handlers != null) {
handlers.remove(listener);
try {
lock.lock();

if (handlers != null) {
handlers.remove(listener);
}

} finally {
lock.unlock();
}
}

Expand All @@ -123,14 +153,25 @@ public void removeListener(IsSimplePromiseResponseHandler<T> listener) {
* @param promise to handle
*/
protected void handlePromise(Promise<T> promise) {

if (!promise.isSuccess()) {
this.setException(promise.cause());
} else {
this.response = promise.getNow();
if (handlers != null) {
for (IsSimplePromiseResponseHandler<T> h : handlers) {
h.onResponse(this);

List<IsSimplePromiseResponseHandler<T>> copy = null;

try {
lock.lock();
this.response = promise.getNow();
if (handlers != null) {
copy = new LinkedList<>(handlers);
}
} finally {
lock.unlock();
}

for (IsSimplePromiseResponseHandler<T> h : copy) {
h.onResponse(this);
}
}
}
Expand All @@ -141,10 +182,21 @@ protected void handlePromise(Promise<T> promise) {
* @param exception to set.
*/
public void setException(Throwable exception) {
this.exception = exception;

if (handlers != null) {
for (IsSimplePromiseResponseHandler<T> h : handlers) {
List<IsSimplePromiseResponseHandler<T>> copy = null;

try {
lock.lock();
this.exception = exception;
if (handlers != null) {
copy = new LinkedList<>(handlers);
}
} finally {
lock.unlock();
}

if (copy != null) {
for (IsSimplePromiseResponseHandler<T> h : copy) {
h.onResponse(this);
}
}
Expand Down