From 48a13b402dcb461211f7cea48663e9d84c468611 Mon Sep 17 00:00:00 2001 From: Quan Date: Mon, 5 Jun 2023 11:05:36 +0800 Subject: [PATCH] new feature: pop batch ack implementation broker-side --- .../rocketmq/broker/BrokerController.java | 3 + .../broker/processor/AckMessageProcessor.java | 288 +++++++++++------- .../processor/PopBufferMergeService.java | 21 +- .../processor/PopInflightMessageCounter.java | 22 +- .../processor/AckMessageProcessorTest.java | 252 ++++++++++++++- .../PopInflightMessageCounterTest.java | 13 +- .../BitSetSerializerDeserializer.java | 52 ++++ .../remoting/protocol/RequestCode.java | 1 + .../remoting/protocol/body/BatchAck.java | 131 ++++++++ .../body/BatchAckMessageRequestBody.java | 43 +++ .../protocol/header/ExtraInfoUtil.java | 13 +- .../remoting/protocol/body/BatchAckTest.java | 112 +++++++ 12 files changed, 809 insertions(+), 142 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 8560bde9faa..f3a832e8c5f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1003,6 +1003,9 @@ public void registerProcessor() { */ this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); + + this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); /** * ChangeInvisibleTimeProcessor */ diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index fa1c0793e42..2140aa881cd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -36,18 +36,22 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.BatchAck; +import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.pop.AckMsg; +import org.apache.rocketmq.store.pop.BatchAckMsg; public class AckMessageProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; - private String reviveTopic; - private PopReviveService[] popReviveServices; + private final String reviveTopic; + private final PopReviveService[] popReviveServices; public AckMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; @@ -93,7 +97,7 @@ public boolean isPopReviveServiceRunning() { @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { return this.processRequest(ctx.channel(), request, true); } @@ -103,135 +107,209 @@ public boolean rejectRequest() { } private RemotingCommand processRequest(final Channel channel, RemotingCommand request, - boolean brokerAllowSuspend) throws RemotingCommandException { - final AckMessageRequestHeader requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class); - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - AckMsg ackMsg = new AckMsg(); - RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null); + boolean brokerAllowSuspend) throws RemotingCommandException { + AckMessageRequestHeader requestHeader; + BatchAckMessageRequestBody reqBody = null; + final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null); response.setOpaque(request.getOpaque()); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); - if (null == topicConfig) { - POP_LOGGER.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); - response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); - return response; - } + if (request.getCode() == RequestCode.ACK_MESSAGE) { + requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class); - if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() || requestHeader.getQueueId() < 0) { - String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", - requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); - POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark(errorInfo); - return response; - } - long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); - long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); - if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() > maxOffset) { - String errorInfo = String.format("offset is illegal, key:%s@%d, commit:%d, store:%d~%d", - requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getOffset(), minOffset, maxOffset); - POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.NO_MESSAGE); - response.setRemark(errorInfo); - return response; - } - String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo()); - - ackMsg.setAckOffset(requestHeader.getOffset()); - ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo)); - ackMsg.setConsumerGroup(requestHeader.getConsumerGroup()); - ackMsg.setTopic(requestHeader.getTopic()); - ackMsg.setQueueId(requestHeader.getQueueId()); - ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo)); - ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo)); - - int rqId = ExtraInfoUtil.getReviveQid(extraInfo); - long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); - - this.brokerController.getBrokerStatsManager().incBrokerAckNums(1); - this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); - - if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { - // order - String lockKey = requestHeader.getTopic() + PopAckConstants.SPLIT - + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + requestHeader.getQueueId(); - long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId()); - if (requestHeader.getOffset() < oldOffset) { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + if (null == topicConfig) { + POP_LOGGER.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; } - while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { + + if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() || requestHeader.getQueueId() < 0) { + String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); + POP_LOGGER.warn(errorInfo); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark(errorInfo); + return response; } - try { - oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId()); - if (requestHeader.getOffset() < oldOffset) { - return response; + + long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); + long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); + if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() > maxOffset) { + String errorInfo = String.format("offset is illegal, key:%s@%d, commit:%d, store:%d~%d", + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getOffset(), minOffset, maxOffset); + POP_LOGGER.warn(errorInfo); + response.setCode(ResponseCode.NO_MESSAGE); + response.setRemark(errorInfo); + return response; + } + + appendAck(requestHeader, null, response, channel, null); + } else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) { + if (request.getBody() != null) { + reqBody = BatchAckMessageRequestBody.decode(request.getBody(), BatchAckMessageRequestBody.class); + } + if (reqBody == null || reqBody.getAcks() == null || reqBody.getAcks().isEmpty()) { + response.setCode(ResponseCode.NO_MESSAGE); + return response; + } + for (BatchAck bAck : reqBody.getAcks()) { + appendAck(null, bAck, response, channel, reqBody.getBrokerName()); + } + } else { + POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", request.getCode(), RemotingHelper.parseChannelRemoteAddr(channel)); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark(String.format("AckMessageProcessor failed to process RequestCode: %d", request.getCode())); + return response; + } + return response; + } + + private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck, final RemotingCommand response, final Channel channel, String brokerName) { + String[] extraInfo; + String consumeGroup, topic; + int qId, rqId; + long startOffset, ackOffset; + long popTime, invisibleTime; + AckMsg ackMsg; + int ackCount = 0; + if (batchAck == null) { + // single ack + extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo()); + brokerName = ExtraInfoUtil.getBrokerName(extraInfo); + consumeGroup = requestHeader.getConsumerGroup(); + topic = requestHeader.getTopic(); + qId = requestHeader.getQueueId(); + rqId = ExtraInfoUtil.getReviveQid(extraInfo); + startOffset = ExtraInfoUtil.getCkQueueOffset(extraInfo); + ackOffset = requestHeader.getOffset(); + popTime = ExtraInfoUtil.getPopTime(extraInfo); + invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); + + if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { + // order + String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; + long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { } - long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( - requestHeader.getTopic(), requestHeader.getConsumerGroup(), - requestHeader.getQueueId(), requestHeader.getOffset(), - ExtraInfoUtil.getPopTime(extraInfo)); - if (nextOffset > -1) { - if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId())) { - this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset); + try { + oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; } - if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), - requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) { - this.brokerController.getPopMessageProcessor().notifyMessageArriving( - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId()); + long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( + topic, consumeGroup, + qId, ackOffset, + popTime); + if (nextOffset > -1) { + if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( + topic, consumeGroup, qId)) { + this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), + consumeGroup, topic, qId, nextOffset); + } + if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, + consumeGroup, qId, invisibleTime)) { + this.brokerController.getPopMessageProcessor().notifyMessageArriving( + topic, consumeGroup, qId); + } + } else if (nextOffset == -1) { + String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", + lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); + POP_LOGGER.warn(errorInfo); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark(errorInfo); + return; } - } else if (nextOffset == -1) { - String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", - lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress()); - POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark(errorInfo); - return response; + } finally { + this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); } - } finally { - this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); + brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); + return; } - decInFlightMessageNum(requestHeader); - return response; + + ackMsg = new AckMsg(); + ackCount = 1; + } else { + // batch ack + consumeGroup = batchAck.getConsumerGroup(); + topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), ExtraInfoUtil.RETRY_TOPIC.equals(batchAck.getRetry())); + qId = batchAck.getQueueId(); + rqId = batchAck.getReviveQueueId(); + startOffset = batchAck.getStartOffset(); + ackOffset = -1; + popTime = batchAck.getPopTime(); + invisibleTime = batchAck.getInvisibleTime(); + + long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId); + long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, qId); + if (minOffset == -1 || maxOffset == -1) { + POP_LOGGER.error("Illegal topic or queue found when batch ack {}", batchAck); + return; + } + + BatchAckMsg batchAckMsg = new BatchAckMsg(); + for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) { + if (!batchAck.getBitSet().get(i)) { + continue; + } + long offset = startOffset + i; + if (offset < minOffset || offset > maxOffset) { + continue; + } + batchAckMsg.getAckOffsetList().add(offset); + } + if (batchAckMsg.getAckOffsetList().isEmpty()) { + return; + } + + ackMsg = batchAckMsg; + ackCount = batchAckMsg.getAckOffsetList().size(); } + this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount); + this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount); + + ackMsg.setConsumerGroup(consumeGroup); + ackMsg.setTopic(topic); + ackMsg.setQueueId(qId); + ackMsg.setStartOffset(startOffset); + ackMsg.setAckOffset(ackOffset); + ackMsg.setPopTime(popTime); + ackMsg.setBrokerName(brokerName); + if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) { - decInFlightMessageNum(requestHeader); - return response; + brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); + return; } + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(reviveTopic); msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset)); - //msgInner.setQueueId(Integer.valueOf(extraInfo[3])); msgInner.setQueueId(rqId); - msgInner.setTags(PopAckConstants.ACK_TAG); + if (ackMsg instanceof BatchAckMsg) { + msgInner.setTags(PopAckConstants.BATCH_ACK_TAG); + msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genBatchAckUniqueId((BatchAckMsg) ackMsg)); + } else { + msgInner.setTags(PopAckConstants.ACK_TAG); + msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); + } msgInner.setBornTimestamp(System.currentTimeMillis()); msgInner.setBornHost(this.brokerController.getStoreHost()); msgInner.setStoreHost(this.brokerController.getStoreHost()); - msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + invisibleTime); + msgInner.setDeliverTimeMs(popTime + invisibleTime); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK - && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT - && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT - && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { + && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT + && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT + && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { POP_LOGGER.error("put ack msg error:" + putMessageResult); } + System.out.printf("put ack to store %s", ackMsg); PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); - decInFlightMessageNum(requestHeader); - return response; + brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); } - - private void decInFlightMessageNum(AckMessageRequestHeader requestHeader) { - this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum( - requestHeader.getTopic(), - requestHeader.getConsumerGroup(), - requestHeader.getExtraInfo() - ); - } - } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index c5889f5562e..d7bc7c6946a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -538,12 +538,23 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) { return false; } - int indexOfAck = point.indexOfAck(ackMsg.getAckOffset()); - if (indexOfAck > -1) { - markBitCAS(pointWrapper.getBits(), indexOfAck); + if (ackMsg instanceof BatchAckMsg) { + for (Long ackOffset : ((BatchAckMsg) ackMsg).getAckOffsetList()) { + int indexOfAck = point.indexOfAck(ackOffset); + if (indexOfAck > -1) { + markBitCAS(pointWrapper.getBits(), indexOfAck); + } else { + POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point); + } + } } else { - POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point); - return true; + int indexOfAck = point.indexOfAck(ackMsg.getAckOffset()); + if (indexOfAck > -1) { + markBitCAS(pointWrapper.getBits(), indexOfAck); + } else { + POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point); + return true; + } } if (brokerController.getBrokerConfig().isEnablePopLog()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java index 584cc54ba82..6749af3d750 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java @@ -16,18 +16,18 @@ */ package org.apache.rocketmq.broker.processor; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.store.pop.PopCheckPoint; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + public class PopInflightMessageCounter { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -61,26 +61,24 @@ public void incrementInFlightMessageNum(String topic, String group, int queueId, }); } - public void decrementInFlightMessageNum(String topic, String group, String ckInfo) { - String[] ckInfoList = ExtraInfoUtil.split(ckInfo); - long popTime = ExtraInfoUtil.getPopTime(ckInfoList); + public void decrementInFlightMessageNum(String topic, String group, long popTime, int qId, int delta) { if (popTime < this.brokerController.getShouldStartTime()) { return; } - decrementInFlightMessageNum(topic, group, ExtraInfoUtil.getQueueId(ckInfoList)); + decrementInFlightMessageNum(topic, group, qId, delta); } public void decrementInFlightMessageNum(PopCheckPoint checkPoint) { if (checkPoint.getPopTime() < this.brokerController.getShouldStartTime()) { return; } - decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId()); + decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId(), 1); } - public void decrementInFlightMessageNum(String topic, String group, int queueId) { + private void decrementInFlightMessageNum(String topic, String group, int queueId, int delta) { topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, queueNum) -> { queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> { - if (counter.decrementAndGet() <= 0) { + if (counter.addAndGet(-delta) <= 0) { return null; } return counter; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java index 6719df08f52..c0afb46c330 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java @@ -18,12 +18,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.lang.reflect.Field; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExtBrokerInner; @@ -36,6 +36,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.BatchAck; +import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; @@ -53,15 +55,25 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; + import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class AckMessageProcessorTest { private AckMessageProcessor ackMessageProcessor; + @Mock + private PopMessageProcessor popMessageProcessor; @Spy private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); @Mock @@ -77,6 +89,9 @@ public class AckMessageProcessorTest { @Mock private Broker2Client broker2Client; + private static final long MIN_OFFSET_IN_QUEUE = 100; + private static final long MAX_OFFSET_IN_QUEUE = 999; + @Before public void init() throws IllegalAccessException, NoSuchFieldException { clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0); @@ -91,19 +106,27 @@ public void init() throws IllegalAccessException, NoSuchFieldException { brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); ConsumerData consumerData = createConsumerData(group, topic); brokerController.getConsumerManager().registerConsumer( - consumerData.getGroupName(), - clientInfo, - consumerData.getConsumeType(), - consumerData.getMessageModel(), - consumerData.getConsumeFromWhere(), - consumerData.getSubscriptionDataSet(), - false); + consumerData.getGroupName(), + clientInfo, + consumerData.getConsumeType(), + consumerData.getMessageModel(), + consumerData.getConsumeFromWhere(), + consumerData.getSubscriptionDataSet(), + false); ackMessageProcessor = new AckMessageProcessor(brokerController); + + when(messageStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(MIN_OFFSET_IN_QUEUE); + when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(MAX_OFFSET_IN_QUEUE); + + when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor); } @Test public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException { when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + PopBufferMergeService popBufferMergeService = mock(PopBufferMergeService.class); + when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(false); + when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService); int queueId = 0; long queueOffset = 0; @@ -112,11 +135,11 @@ public void testProcessRequest_Success() throws RemotingCommandException, Interr int reviveQid = 0; String brokerName = "test_broker"; String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, invisibleTime, reviveQid, - topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + queueOffset; + topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + queueOffset; AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(0); - requestHeader.setOffset(0L); + requestHeader.setOffset(MIN_OFFSET_IN_QUEUE + 1); requestHeader.setConsumerGroup(group); requestHeader.setExtraInfo(extraInfo); @@ -126,4 +149,213 @@ public void testProcessRequest_Success() throws RemotingCommandException, Interr assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); } + + @Test + public void testProcessRequest_WrongRequestCode() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, null); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL); + assertThat(response.getRemark()).isEqualTo("AckMessageProcessor failed to process RequestCode: " + RequestCode.SEND_MESSAGE); + } + + @Test + public void testSingleAck_TopicCheck() throws RemotingCommandException { + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + requestHeader.setTopic("wrongTopic"); + requestHeader.setQueueId(0); + requestHeader.setOffset(0L); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST); + assertThat(response.getRemark()).contains("not exist, apply first"); + } + + @Test + public void testSingleAck_QueueCheck() throws RemotingCommandException { + { + int qId = -1; + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(qId); + requestHeader.setOffset(0L); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL); + assertThat(response.getRemark()).contains("queueId[" + qId + "] is illegal"); + } + + { + int qId = 17; + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(qId); + requestHeader.setOffset(0L); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL); + assertThat(response.getRemark()).contains("queueId[" + qId + "] is illegal"); + } + } + + @Test + public void testSingleAck_OffsetCheck() throws RemotingCommandException { + { + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(0); + requestHeader.setOffset(MIN_OFFSET_IN_QUEUE - 1); + //requestHeader.setOffset(maxOffsetInQueue + 1); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); + assertThat(response.getRemark()).contains("offset is illegal"); + } + + { + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(0); + //requestHeader.setOffset(minOffsetInQueue - 1); + requestHeader.setOffset(MAX_OFFSET_IN_QUEUE + 1); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); + assertThat(response.getRemark()).contains("offset is illegal"); + } + } + + @Test + public void testBatchAck_NoMessage() throws RemotingCommandException { + { + //reqBody == null + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); + } + + { + //reqBody.getAcks() == null + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + BatchAckMessageRequestBody reqBody = new BatchAckMessageRequestBody(); + request.setBody(reqBody.encode()); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); + } + + { + //reqBody.getAcks().isEmpty() + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + BatchAckMessageRequestBody reqBody = new BatchAckMessageRequestBody(); + reqBody.setAcks(new ArrayList<>()); + request.setBody(reqBody.encode()); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); + } + } + + @Test + public void testSingleAck_appendAck() throws RemotingCommandException { + { + // buffer addAk OK + PopBufferMergeService popBufferMergeService = mock(PopBufferMergeService.class); + when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(true); + when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService); + + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + long ackOffset = MIN_OFFSET_IN_QUEUE + 10; + requestHeader.setTopic(topic); + requestHeader.setQueueId(0); + requestHeader.setOffset(ackOffset); + requestHeader.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP); + requestHeader.setExtraInfo("64 1666860736757 60000 4 0 broker-a 0 " + ackOffset); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + { + // buffer addAk fail + PopBufferMergeService popBufferMergeService = mock(PopBufferMergeService.class); + when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(false); + when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService); + // store putMessage OK + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, null); + when(messageStore.putMessage(any())).thenReturn(putMessageResult); + + AckMessageRequestHeader requestHeader = new AckMessageRequestHeader(); + long ackOffset = MIN_OFFSET_IN_QUEUE + 10; + requestHeader.setTopic(topic); + requestHeader.setQueueId(0); + requestHeader.setOffset(ackOffset); + requestHeader.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP); + requestHeader.setExtraInfo("64 1666860736757 60000 4 0 broker-a 0 " + ackOffset); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + } + + @Test + public void testBatchAck_appendAck() throws RemotingCommandException { + { + // buffer addAk OK + PopBufferMergeService popBufferMergeService = mock(PopBufferMergeService.class); + when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(true); + when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService); + + BatchAck bAck1 = new BatchAck(); + bAck1.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP); + bAck1.setTopic(topic); + bAck1.setStartOffset(MIN_OFFSET_IN_QUEUE); + bAck1.setBitSet(new BitSet()); + bAck1.getBitSet().set(1); + bAck1.setRetry("0"); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + BatchAckMessageRequestBody reqBody = new BatchAckMessageRequestBody(); + reqBody.setAcks(Collections.singletonList(bAck1)); + request.setBody(reqBody.encode()); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + { + // buffer addAk fail + PopBufferMergeService popBufferMergeService = mock(PopBufferMergeService.class); + when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(false); + when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService); + // store putMessage OK + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, null); + when(messageStore.putMessage(any())).thenReturn(putMessageResult); + + BatchAck bAck1 = new BatchAck(); + bAck1.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP); + bAck1.setTopic(topic); + bAck1.setStartOffset(MIN_OFFSET_IN_QUEUE); + bAck1.setBitSet(new BitSet()); + bAck1.getBitSet().set(1); + bAck1.setRetry("0"); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + BatchAckMessageRequestBody reqBody = new BatchAckMessageRequestBody(); + reqBody.setAcks(Arrays.asList(bAck1)); + request.setBody(reqBody.encode()); + request.makeCustomHeaderToNet(); + RemotingCommand response = ackMessageProcessor.processRequest(handlerContext, request); + + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + } + } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java index 4e83ac74908..dea59fc99e6 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.broker.processor; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.store.pop.PopCheckPoint; import org.junit.Test; @@ -42,12 +41,10 @@ public void testNum() { counter.incrementInFlightMessageNum(topic, group, 0, 3); assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0)); - counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(), - 0, 0, topic, "broker", 0)); + counter.decrementInFlightMessageNum(topic, group, System.currentTimeMillis(), 0, 1); assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 0)); - counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis() - 1000, - 0, 0, topic, "broker", 0)); + counter.decrementInFlightMessageNum(topic, group, System.currentTimeMillis() - 1000, 0, 1); assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 0)); PopCheckPoint popCheckPoint = new PopCheckPoint(); @@ -59,12 +56,10 @@ public void testNum() { counter.decrementInFlightMessageNum(popCheckPoint); assertEquals(1, counter.getGroupPopInFlightMessageNum(topic, group, 0)); - counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(), - 0, 0, topic, "broker", 0)); + counter.decrementInFlightMessageNum(topic, group, System.currentTimeMillis(), 0 ,1); assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0)); - counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(), - 0, 0, topic, "broker", 0)); + counter.decrementInFlightMessageNum(topic, group, System.currentTimeMillis(), 0, 1); assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0)); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java new file mode 100644 index 00000000000..8f53c0250bf --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol; + +import com.alibaba.fastjson.parser.DefaultJSONParser; +import com.alibaba.fastjson.parser.JSONToken; +import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer; +import com.alibaba.fastjson.serializer.JSONSerializer; +import com.alibaba.fastjson.serializer.ObjectSerializer; +import com.alibaba.fastjson.serializer.SerializeWriter; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.BitSet; + +public class BitSetSerializerDeserializer implements ObjectSerializer, ObjectDeserializer { + + @Override + public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features) throws IOException { + SerializeWriter out = serializer.out; + out.writeByteArray(((BitSet) object).toByteArray()); + } + + @SuppressWarnings("unchecked") + @Override + public T deserialze(DefaultJSONParser parser, Type type, Object fieldName) { + byte[] bytes = parser.parseObject(byte[].class); + if (bytes != null) { + return (T) BitSet.valueOf(bytes); + } + return null; + } + + @Override + public int getFastMatchToken() { + return JSONToken.LITERAL_STRING; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 9f9a64ed0e7..2f2e1d1cc57 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -88,6 +88,7 @@ public class RequestCode { public static final int POP_MESSAGE = 200050; public static final int ACK_MESSAGE = 200051; + public static final int BATCH_ACK_MESSAGE = 200151; public static final int PEEK_MESSAGE = 200052; public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053; public static final int NOTIFICATION = 200054; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java new file mode 100644 index 00000000000..82dcd8567ea --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import com.alibaba.fastjson.annotation.JSONField; +import org.apache.rocketmq.remoting.protocol.BitSetSerializerDeserializer; + +import java.io.Serializable; +import java.util.BitSet; + +public class BatchAck implements Serializable { + @JSONField(name = "c", alternateNames = {"consumerGroup"}) + private String consumerGroup; + @JSONField(name = "t", alternateNames = {"topic"}) + private String topic; + @JSONField(name = "r", alternateNames = {"retry"}) + private String retry; // "1" if is retry topic + @JSONField(name = "so", alternateNames = {"startOffset"}) + private long startOffset; + @JSONField(name = "q", alternateNames = {"queueId"}) + private int queueId; + @JSONField(name = "rq", alternateNames = {"reviveQueueId"}) + private int reviveQueueId; + @JSONField(name = "pt", alternateNames = {"popTime"}) + private long popTime; + @JSONField(name = "it", alternateNames = {"invisibleTime"}) + private long invisibleTime; + @JSONField(name = "b", alternateNames = {"bitSet"}, serializeUsing = BitSetSerializerDeserializer.class, deserializeUsing = BitSetSerializerDeserializer.class) + private BitSet bitSet; // ack offsets bitSet + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getRetry() { + return retry; + } + + public void setRetry(String retry) { + this.retry = retry; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public int getReviveQueueId() { + return reviveQueueId; + } + + public void setReviveQueueId(int reviveQueueId) { + this.reviveQueueId = reviveQueueId; + } + + public long getPopTime() { + return popTime; + } + + public void setPopTime(long popTime) { + this.popTime = popTime; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public void setInvisibleTime(long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public BitSet getBitSet() { + return bitSet; + } + + public void setBitSet(BitSet bitSet) { + this.bitSet = bitSet; + } + + @Override + public String toString() { + return "BatchAck{" + + "consumerGroup='" + consumerGroup + '\'' + + ", topic='" + topic + '\'' + + ", retry='" + retry + '\'' + + ", startOffset=" + startOffset + + ", queueId=" + queueId + + ", reviveQueueId=" + reviveQueueId + + ", popTime=" + popTime + + ", invisibleTime=" + invisibleTime + + ", bitSet=" + bitSet + + '}'; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java new file mode 100644 index 00000000000..f0e1a8c3c8d --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.List; + +public class BatchAckMessageRequestBody extends RemotingSerializable { + private String brokerName; + private List acks; + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public List getAcks() { + return acks; + } + + public void setAcks(List acks) { + this.acks = acks; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java index 7172ba959f7..9a5fa89abab 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java @@ -26,7 +26,7 @@ public class ExtraInfoUtil { private static final String NORMAL_TOPIC = "0"; - private static final String RETRY_TOPIC = "1"; + public static final String RETRY_TOPIC = "1"; private static final String QUEUE_OFFSET = "qo"; public static String[] split(String extraInfo) { @@ -75,6 +75,17 @@ public static String getRealTopic(String[] extraInfoStrs, String topic, String c } } + public static String getRealTopic(String topic, String cid, boolean isRetry) { + return isRetry ? KeyBuilder.buildPopRetryTopic(topic, cid) : topic; + } + + public static String getRetry(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 5) { + throw new IllegalArgumentException("getRetry fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return extraInfoStrs[4]; + } + public static String getBrokerName(String[] extraInfoStrs) { if (extraInfoStrs == null || extraInfoStrs.length < 6) { throw new IllegalArgumentException("getBrokerName fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java new file mode 100644 index 00000000000..427a132d646 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import com.alibaba.fastjson.JSON; +import org.apache.rocketmq.common.MixAll; +import org.junit.Test; + +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BatchAckTest { + private static String topic = "myTopic"; + private static String cid = MixAll.DEFAULT_CONSUMER_GROUP; + private static long startOffset = 100; + private static int qId = 1; + private static int rqId = 2; + private static long popTime = System.currentTimeMillis(); + private static long invisibleTime = 5000; + + @Test + public void testBatchAckSerializerDeserializer() { + List ackOffsetList = Arrays.asList(startOffset + 1, startOffset + 3, startOffset + 5); + BatchAck batchAck = new BatchAck(); + batchAck.setConsumerGroup(cid); + batchAck.setTopic(topic); + batchAck.setRetry("0"); + batchAck.setStartOffset(startOffset); + batchAck.setQueueId(qId); + batchAck.setReviveQueueId(rqId); + batchAck.setPopTime(popTime); + batchAck.setInvisibleTime(invisibleTime); + batchAck.setBitSet(new BitSet()); + for (Long offset : ackOffsetList) { + batchAck.getBitSet().set((int) (offset - startOffset)); + } + String jsonStr = JSON.toJSONString(batchAck); + + BatchAck bAck = JSON.parseObject(jsonStr, BatchAck.class); + assertThat(bAck.getConsumerGroup()).isEqualTo(cid); + assertThat(bAck.getTopic()).isEqualTo(topic); + assertThat(bAck.getStartOffset()).isEqualTo(startOffset); + assertThat(bAck.getQueueId()).isEqualTo(qId); + assertThat(bAck.getReviveQueueId()).isEqualTo(rqId); + assertThat(bAck.getPopTime()).isEqualTo(popTime); + assertThat(bAck.getInvisibleTime()).isEqualTo(invisibleTime); + for (int i = 0; i < bAck.getBitSet().length(); i++) { + long ackOffset = startOffset + i; + if (ackOffsetList.contains(ackOffset)) { + assertThat(bAck.getBitSet().get(i)).isTrue(); + } else { + assertThat(bAck.getBitSet().get(i)).isFalse(); + } + } + } + + @Test + public void testWithBatchAckMessageRequestBody() { + List ackOffsetList = Arrays.asList(startOffset + 1, startOffset + 3, startOffset + 5); + BatchAck batchAck = new BatchAck(); + batchAck.setConsumerGroup(cid); + batchAck.setTopic(topic); + batchAck.setRetry("0"); + batchAck.setStartOffset(startOffset); + batchAck.setQueueId(qId); + batchAck.setReviveQueueId(rqId); + batchAck.setPopTime(popTime); + batchAck.setInvisibleTime(invisibleTime); + batchAck.setBitSet(new BitSet()); + for (Long offset : ackOffsetList) { + batchAck.getBitSet().set((int) (offset - startOffset)); + } + + BatchAckMessageRequestBody batchAckMessageRequestBody = new BatchAckMessageRequestBody(); + batchAckMessageRequestBody.setAcks(Arrays.asList(batchAck)); + byte[] bytes = batchAckMessageRequestBody.encode(); + BatchAckMessageRequestBody reqBody = BatchAckMessageRequestBody.decode(bytes, BatchAckMessageRequestBody.class); + BatchAck bAck = reqBody.getAcks().get(0); + assertThat(bAck.getConsumerGroup()).isEqualTo(cid); + assertThat(bAck.getTopic()).isEqualTo(topic); + assertThat(bAck.getStartOffset()).isEqualTo(startOffset); + assertThat(bAck.getQueueId()).isEqualTo(qId); + assertThat(bAck.getReviveQueueId()).isEqualTo(rqId); + assertThat(bAck.getPopTime()).isEqualTo(popTime); + assertThat(bAck.getInvisibleTime()).isEqualTo(invisibleTime); + for (int i = 0; i < bAck.getBitSet().length(); i++) { + long ackOffset = startOffset + i; + if (ackOffsetList.contains(ackOffset)) { + assertThat(bAck.getBitSet().get(i)).isTrue(); + } else { + assertThat(bAck.getBitSet().get(i)).isFalse(); + } + } + } +}