Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7074] Allow a BoundaryType to be specified when retrieving offset based on the timestamp #7082

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,15 @@ private RemotingCommand rewriteRequestForStaticTopic(SearchOffsetRequestHeader r
continue;
}
if (mappingDetail.getBname().equals(item.getBname())) {
offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
MessageStore messageStore = this.brokerController.getMessageStore();
if (messageStore instanceof DefaultMessageStore) {
// get offset with specific boundary type
offset = ((DefaultMessageStore) messageStore).getOffsetInQueueByTime(requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getTimestamp(), requestHeader.getBoundaryType());
} else {
offset = messageStore.getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that getOffsetInQueueByTime also exists in TieredMessageStore. Would it be better to abstract an interface instead of checking the type every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I also wanted to add an interface directly at the beginning. But later I found that the BoundaryType used in TieredMessageStore belongs to package org.apache.rocketmq.tieredstore.common and the BoundaryType used in DefaultMessageStore belongs to package org.apache.rocketmq.common. So if I wanna to abstract an interface, the argument of the interface might be boundaryTypeName(String), which means that the enum class is converted to a string, or the string is converted to the enum class, each time the argument is passed and the argument is fetched.
Or maybe I should unify the two enum classes?What do you think I should do in this situation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to unify the two enumeration classes.

if (offset > 0) {
offset = item.computeStaticQueueOffsetStrictly(offset);
break;
Expand Down Expand Up @@ -1038,8 +1046,17 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
return rewriteResult;
}

long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
long offset = -1;

MessageStore messageStore = this.brokerController.getMessageStore();
if (messageStore instanceof DefaultMessageStore) {
// get offset with specific boundary type
offset = ((DefaultMessageStore) messageStore).getOffsetInQueueByTime(requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getTimestamp(), requestHeader.getBoundaryType());
} else {
offset = messageStore.getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
}

responseHeader.setOffset(offset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
Expand Down Expand Up @@ -184,6 +185,11 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
// default return lower boundary offset when there are more than one offsets.
return searchOffset(mq, timestamp, BoundaryType.LOWER);
}

public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -192,7 +198,8 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
boundaryType, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
Expand Down Expand Up @@ -1237,13 +1238,20 @@ public long searchOffset(final String addr, final String topic, final int queueI
public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
// default return lower boundary offset when there are more than one offsets.
return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, timeoutMillis);
}

public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp,
final BoundaryType boundaryType, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(messageQueue.getTopic());
requestHeader.setQueueId(messageQueue.getQueueId());
requestHeader.setBname(messageQueue.getBrokerName());
requestHeader.setTimestamp(timestamp);
requestHeader.setBoundaryType(boundaryType);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class RemotingCommand {
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
private static final String BOUNDARY_TYPE_CANONICAL_NAME = BoundaryType.class.getCanonicalName();
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);

Expand Down Expand Up @@ -311,6 +313,8 @@ public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCust
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
} else if (type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) {
valueParsed = BoundaryType.getType(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.apache.rocketmq.remoting.protocol.header;

import com.google.common.base.MoreObjects;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
Expand All @@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private Long timestamp;

private BoundaryType boundaryType;

@Override
public void checkFields() throws RemotingCommandException {

Expand Down Expand Up @@ -66,12 +69,22 @@ public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public BoundaryType getBoundaryType() {
// default return LOWER
return boundaryType == null ? BoundaryType.LOWER : boundaryType;
}

public void setBoundaryType(BoundaryType boundaryType) {
this.boundaryType = boundaryType;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("topic", topic)
.add("queueId", queueId)
.add("timestamp", timestamp)
.add("boundaryType", boundaryType.getName())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
Expand Down Expand Up @@ -1015,9 +1016,18 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue

@Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER);
}

public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long resultOffset = logic.getOffsetInQueueByTime(timestamp);
long resultOffset = -1;
if (logic instanceof ConsumeQueue) {
resultOffset = ((ConsumeQueue) logic).getOffsetInQueueByTime(timestamp, boundaryType);
} else {
resultOffset = logic.getOffsetInQueueByTime(timestamp);
}
// Make sure the result offset is in valid range.
resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
Expand Down Expand Up @@ -122,6 +123,14 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
}

public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException {
return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.LOWER);
}

public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException {
return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.UPPER);
}

@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return defaultMQAdminExtImpl.maxOffset(mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
Expand Down Expand Up @@ -1700,6 +1701,10 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
}

public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp, boundaryType);
}

@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
Expand Down Expand Up @@ -512,6 +513,30 @@ public void testSearchOffset() throws Exception {
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1, BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L);
}

@Test
public void testSearchOffsetWithSpecificBoundaryType() throws Exception {

String namesrvAddr = "127.0.0.1:9876";
String topic = "test-topic";

DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setInstanceName(UUID.randomUUID().toString());
mqAdminExt.setNamesrvAddr(namesrvAddr);

mqAdminExt.start();
List<QueueTimeSpan> timeSpanList = mqAdminExt.queryConsumeTimeSpan(topic, null);
if (timeSpanList != null && timeSpanList.size() > 0) {
for (QueueTimeSpan timeSpan: timeSpanList) {
MessageQueue mq = timeSpan.getMessageQueue();
long maxOffset = mqAdminExt.maxOffset(mq);
long minOffset = mqAdminExt.minOffset(mq);
// if there is at least one message in queue, the maxOffset returns the queue's latest offset + 1
assertThat((maxOffset == 0 ? 0 : maxOffset - 1) == mqAdminExt.searchUpperBoundaryOffset(mq, timeSpan.getMaxTimeStamp())).isTrue();
assertThat(minOffset == mqAdminExt.searchLowerBoundaryOffset(mq, timeSpan.getMinTimeStamp())).isTrue();
}
}
}
RongtongJin marked this conversation as resolved.
Show resolved Hide resolved

@Test
public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException {
TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig");
Expand Down