Skip to content

Commit

Permalink
[ISSUE #apache#6805] Add peek API support
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb committed Aug 9, 2023
1 parent aa75ca0 commit d52fca2
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
Expand Down Expand Up @@ -849,6 +850,38 @@ public PullResult pullMessage(
return null;
}

public void peekMessageAsync(
final String brokerName, final String addr, final PeekMessageRequestHeader requestHeader,
final long timeoutMillis, final PopCallback peekCallback
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingTooMuchRequestException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PEEK_MESSAGE, requestHeader);
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) {
@Override
public void onComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PopResult
popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader);
assert popResult != null;
peekCallback.onSuccess(popResult);
} catch (Exception e) {
peekCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
peekCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
peekCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
peekCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}

public void popMessageAsync(
final String brokerName, final String addr, final PopMessageRequestHeader requestHeader,
final long timeoutMillis, final PopCallback popCallback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
Expand Down Expand Up @@ -257,6 +258,31 @@ public CompletableFuture<RemotingCommand> sendMessageBackAsync(
return future;
}

public CompletableFuture<PopResult> peekMessageAsync(
String brokerAddr,
String brokerName,
PeekMessageRequestHeader requestHeader,
long timeoutMillis
) {
CompletableFuture<PopResult> future = new CompletableFuture<>();
try {
this.peekMessageAsync(brokerName, brokerAddr, requestHeader, timeoutMillis, new PopCallback() {
@Override
public void onSuccess(PopResult popResult) {
future.complete(popResult);
}

@Override
public void onException(Throwable t) {
future.completeExceptionally(t);
}
});
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}

public CompletableFuture<PopResult> popMessageAsync(
String brokerAddr,
String brokerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
Expand Down Expand Up @@ -429,6 +430,68 @@ public RemotingCommand answer(InvocationOnMock mock) {
assertThat(assignments).size().isEqualTo(1);
}

@Test
public void testPeekMessageAsync_Success() throws Exception {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());

PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
responseHeader.setReviveQid(0);
responseHeader.setRestNum(1);
StringBuilder startOffsetInfo = new StringBuilder(64);
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 0L);
responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
StringBuilder msgOffsetInfo = new StringBuilder(64);
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, Collections.singletonList(0L));
responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
response.setRemark("FOUND");
response.makeCustomHeaderToNet();

MessageExt message = new MessageExt();
message.setQueueId(0);
message.setFlag(12);
message.setQueueOffset(0L);
message.setCommitLogOffset(100L);
message.setSysFlag(0);
message.setBornTimestamp(System.currentTimeMillis());
message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
message.setStoreTimestamp(System.currentTimeMillis());
message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
message.setBody("body".getBytes());
message.setTopic(topic);
message.putUserProperty("key", "value");
response.setBody(MessageDecoder.encode(message, false));
responseFuture.setResponseCommand(response);
callback.operationComplete(responseFuture);
return null;
}
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
final CountDownLatch done = new CountDownLatch(1);
mqClientAPI.peekMessageAsync(brokerName, brokerAddr, new PeekMessageRequestHeader(), 10 * 1000, new PopCallback() {
@Override
public void onSuccess(PopResult popResult) {
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
assertThat(popResult.getRestNum()).isEqualTo(1);
assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
done.countDown();
}

@Override
public void onException(Throwable e) {
Assertions.fail("want no exception but got one", e);
done.countDown();
}
});
done.await();
}

@Test
public void testPopMessageAsync_Success() throws Exception {
final long popTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
Expand Down Expand Up @@ -196,6 +197,18 @@ public void testSendMessageBackAsync() throws Exception {
assertEquals(ResponseCode.SUCCESS, remotingCommand.getCode());
}

@Test
public void testPeekMessageAsync() throws Exception {
PopResult popResult = new PopResult(PopStatus.POLLING_NOT_FOUND, null);
doAnswer((Answer<Void>) mock -> {
PopCallback popCallback = mock.getArgument(4);
popCallback.onSuccess(popResult);
return null;
}).when(mqClientAPI).peekMessageAsync(anyString(), anyString(), any(), anyLong(), any());

assertSame(popResult, mqClientAPI.peekMessageAsync(BROKER_ADDR, BROKER_NAME, new PeekMessageRequestHeader(), TIMEOUT).get());
}

@Test
public void testPopMessageAsync() throws Exception {
PopResult popResult = new PopResult(PopStatus.POLLING_NOT_FOUND, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.test.clientinterface.MQConsumer;
import org.apache.rocketmq.test.util.RandomUtil;
Expand Down Expand Up @@ -67,6 +68,32 @@ public void shutdown() {
this.mqClientAPI.shutdown();
}

public CompletableFuture<PopResult> peekMessageAsync(String brokerAddr, MessageQueue mq, int maxNums,
String consumerGroup, long timeout) {
PeekMessageRequestHeader requestHeader = new PeekMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setMaxMsgNums(maxNums);
CompletableFuture<PopResult> future = new CompletableFuture<>();
try {
this.mqClientAPI.peekMessageAsync(mq.getBrokerName(), brokerAddr, requestHeader, timeout, new PopCallback() {
@Override
public void onSuccess(PopResult popResult) {
future.complete(popResult);
}

@Override
public void onException(Throwable e) {
future.completeExceptionally(e);
}
});
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}

public CompletableFuture<PopResult> popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime,
int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order,
String expressionType, String expression) {
Expand Down

0 comments on commit d52fca2

Please sign in to comment.