Skip to content

Commit

Permalink
[ISSUE alibaba#1857] Refactor the duplicated code
Browse files Browse the repository at this point in the history
  • Loading branch information
Rookiexu authored Mar 16, 2020
1 parent 3974677 commit 1558e05
Showing 1 changed file with 16 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1367,16 +1367,7 @@ public void onException(Throwable e) {
}
}, timeout - cost);

Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
Expand Down Expand Up @@ -1432,16 +1423,7 @@ public void onException(Throwable e) {
}
}, timeout - cost);

Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
Expand Down Expand Up @@ -1498,21 +1480,25 @@ public void onException(Throwable e) {
}
}, null, timeout - cost);

Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}

private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
}

public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
Expand Down

0 comments on commit 1558e05

Please sign in to comment.