Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7064] [RIP-66-2] Support KV(RocksDB) Storage for ConsumeQueue #7120

Merged
merged 87 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
09c0c8c
typo int readme[ecosystem]
fujian-zfj Nov 30, 2021
89713ba
Merge branch 'apache:develop' into develop
fujian-zfj Sep 8, 2022
8d0c4fc
Merge branch 'apache:develop' into develop
fujian-zfj Sep 13, 2022
2c9e961
Merge branch 'apache:develop' into develop
fujian-zfj Jan 10, 2023
93d6669
Merge branch 'apache:develop' into develop
fujian-zfj Feb 2, 2023
2dd16d6
Merge branch 'apache:develop' into develop
fujian-zfj Feb 14, 2023
e02c7aa
Merge branch 'apache:develop' into develop
fujian-zfj Mar 2, 2023
cb8c7b0
Merge branch 'apache:develop' into develop
fujian-zfj Mar 10, 2023
03f707f
Merge branch 'apache:develop' into develop
fujian-zfj Mar 10, 2023
6ef608f
Merge branch 'apache:develop' into develop
fujian-zfj Mar 14, 2023
818919f
Merge branch 'apache:develop' into develop
fujian-zfj Mar 21, 2023
b315381
Merge branch 'apache:develop' into develop
fujian-zfj Mar 21, 2023
5d1b050
Merge branch 'apache:develop' into develop
fujian-zfj Mar 30, 2023
de2238b
Merge branch 'apache:develop' into develop
fujian-zfj Apr 3, 2023
d22756f
Merge branch 'apache:develop' into develop
fujian-zfj Apr 25, 2023
83d8178
Merge branch 'apache:develop' into develop
fujian-zfj May 9, 2023
a4c94a9
Merge branch 'apache:develop' into develop
fujian-zfj May 18, 2023
7234a41
Merge branch 'apache:develop' into develop
fujian-zfj Jun 3, 2023
c7708a7
Merge branch 'apache:develop' into develop
fujian-zfj Jul 10, 2023
166ebbd
Merge branch 'apache:develop' into develop
fujian-zfj Jul 11, 2023
6b27cc7
Merge branch 'apache:develop' into develop
fujian-zfj Jul 23, 2023
3b0a27b
Merge branch 'apache:develop' into develop
fujian-zfj Jul 28, 2023
567e626
Merge branch 'apache:develop' into develop
fujian-zfj Aug 5, 2023
fb30962
consumequue support rocksdb
fujian-zfj Aug 5, 2023
88a0d62
fix rocksdb test
fujian-zfj Aug 5, 2023
37e3977
fix rocksdb test
fujian-zfj Aug 5, 2023
a2d5b24
remove unused method
fujian-zfj Aug 6, 2023
36418e1
split into two tables
fujian-zfj Aug 7, 2023
dcfe4ea
CqUnit in Rocksdb [phyOffset, bodySize, tagHashCode, msgStoreTime]
fujian-zfj Aug 7, 2023
70aef73
fix build.baze in tieredMessageStore
fujian-zfj Aug 7, 2023
69c1a14
skip RocksDBMessageStoreTest bazel
fujian-zfj Aug 8, 2023
2c78c9a
skip RocksDBMessageStoreTest bazel
fujian-zfj Aug 8, 2023
7682fe2
Rocksdb TimerMessageStore
fujian-zfj Aug 8, 2023
58ebef1
fix unit test bazel
fujian-zfj Aug 8, 2023
b52bfdf
fix store build.bazel
fujian-zfj Aug 8, 2023
43c1b60
fix store build.bazel
fujian-zfj Aug 8, 2023
8341613
optimize
fujian-zfj Aug 8, 2023
508ec5b
optimize
fujian-zfj Aug 8, 2023
1c866f1
polish
fujian-zfj Aug 9, 2023
d17a56d
build bytebuffer pair inner
fujian-zfj Aug 9, 2023
bc7af05
remove unused code
fujian-zfj Aug 9, 2023
5266c2e
DataConverter.CHARSET_UTF8
fujian-zfj Aug 10, 2023
3c68561
fix comment
fujian-zfj Aug 10, 2023
2b93bfe
fix comment
fujian-zfj Aug 10, 2023
8321eab
Merge branch 'apache:develop' into develop
fujian-zfj Aug 10, 2023
609b865
merge master
fujian-zfj Aug 10, 2023
00f0acd
rebuild test
fujian-zfj Aug 10, 2023
c23ce3c
rebuild test
fujian-zfj Aug 11, 2023
fd6a8c8
optimize
fujian-zfj Aug 15, 2023
76fba27
rebuild test
fujian-zfj Aug 15, 2023
b98758e
Merge branch 'apache:develop' into develop
fujian-zfj Aug 17, 2023
109e954
merge
fujian-zfj Aug 17, 2023
6347162
polish
fujian-zfj Aug 21, 2023
83fb78f
polish
fujian-zfj Aug 21, 2023
a5b10f5
rebuild test
fujian-zfj Aug 23, 2023
c0a22e0
Merge branch 'apache:develop' into develop
fujian-zfj Aug 23, 2023
eb92b78
Merge branch 'develop' into develop_consumequeue_rocksdb
fujian-zfj Aug 23, 2023
d88b6c4
merge develop
fujian-zfj Aug 23, 2023
bae07ee
rebuild test
fujian-zfj Aug 23, 2023
fb4702d
rebuild test
fujian-zfj Aug 24, 2023
29f6b5c
Merge branch 'apache:develop' into develop
fujian-zfj Aug 29, 2023
58feb0d
merge
fujian-zfj Aug 29, 2023
cb7cd9e
fix getConsumeQueue not find cq
fujian-zfj Aug 29, 2023
520da4f
fix test
fujian-zfj Aug 29, 2023
9bfb7f8
rocksdb new version
fujian-zfj Aug 29, 2023
bebfb22
rebuild test
fujian-zfj Aug 29, 2023
9349205
Merge branch 'apache:develop' into develop
fujian-zfj Aug 31, 2023
55b2fc2
mere
fujian-zfj Sep 4, 2023
c21e78f
fix bug
fujian-zfj Sep 6, 2023
347b57d
Merge branch 'apache:develop' into develop
fujian-zfj Sep 8, 2023
20b9bd3
Merge branch 'pr_7120_7' into new-amqp-dev
RongtongJin Sep 11, 2023
7770c9a
Resolve conflicts after merging develop
RongtongJin Sep 11, 2023
fb1f9e7
Merge remote-tracking branch 'apache/develop' into new-amqp-dev
RongtongJin Sep 11, 2023
e95919a
fix updateCqOffset
fujian-zfj Oct 6, 2023
a12aff0
Merge branch 'develop_consumequeue_rocksdb' of github.com:fujian-zfj/…
fujian-zfj Oct 6, 2023
b0810ba
fix recoverAbnormally
fujian-zfj Oct 8, 2023
ca5b41b
fix recoverAbnormally
fujian-zfj Oct 8, 2023
b6b9814
polish
fujian-zfj Oct 8, 2023
2263c12
polish
fujian-zfj Oct 8, 2023
22b81b7
Merge branch 'apache:develop' into develop
fujian-zfj Oct 10, 2023
1aa1b63
Merge branch 'develop' into develop_consumequeue_rocksdb
fujian-zfj Oct 10, 2023
e91794d
polish
fujian-zfj Oct 11, 2023
ec5c319
Merge remote-tracking branch 'apache/develop' into develop_consumeque…
RongtongJin Oct 13, 2023
d0a0ff6
remove exception in cleanunusedTopic
fujian-zfj Oct 13, 2023
2c4e9f0
Merge branch 'develop_consumequeue_rocksdb' of github.com:fujian-zfj/…
fujian-zfj Oct 13, 2023
cca7e2c
remove exception in cleanunusedTopic
fujian-zfj Oct 13, 2023
2c9f88e
remove exception in cleanunusedTopic
fujian-zfj Oct 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ maven_install(
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
"org.apache.rocketmq:rocketmq-rocksdb:1.0.2",
],
fetch_sources = True,
repositories = [
Expand Down
2 changes: 1 addition & 1 deletion broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:io_github_aliyunmq_rocketmq_rocksdb",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:net_java_dev_jna_jna",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,33 @@
*/
package org.apache.rocketmq.broker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand Down Expand Up @@ -126,7 +152,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
Expand All @@ -141,31 +167,6 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -308,7 +309,7 @@ public BrokerController(
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (isEnableRocksDBStore()) {
if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
Expand Down Expand Up @@ -747,7 +748,12 @@ public boolean initializeMetadata() {
public boolean initializeMessageStore() {
boolean result = true;
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
DefaultMessageStore defaultMessageStore;
if (this.messageStoreConfig.isEnableRocksDBStore()) {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler =
Expand Down Expand Up @@ -944,16 +950,16 @@ private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}",
TransactionalMessageServiceImpl.class.getSimpleName());
TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
AbstractTransactionalMessageCheckListener.class);
AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}",
DefaultTransactionalMessageCheckListener.class.getSimpleName());
DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
Expand Down Expand Up @@ -2412,8 +2418,4 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public boolean isEnableRocksDBStore() {
return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void shutdown() {

public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress,
final Integer newMasterEpoch,
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
Expand All @@ -234,7 +234,7 @@ public synchronized void changeBrokerRole(final Long newMasterBrokerId, final St
}
}

public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval());
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
Expand All @@ -49,7 +49,7 @@ public boolean stop() {
@Override
protected void removeConsumerOffset(String topicAtGroup) {
try {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset);
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.CHARSET_UTF8);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
Expand All @@ -58,7 +58,7 @@ protected void removeConsumerOffset(String topicAtGroup) {

@Override
protected void decode0(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.charset);
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
Expand Down Expand Up @@ -93,7 +93,7 @@ public synchronized void persist() {
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset);
byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(rqId);
if (ackMsg instanceof BatchAckMsg) {
msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,18 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,

final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
// delete pop retry topics first
for (String group : groups) {
final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
try {
for (String group : groups) {
final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
}
}
// delete topic
deleteTopicInBroker(topic);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
// delete topic
deleteTopicInBroker(topic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
Expand Down Expand Up @@ -2081,7 +2085,11 @@ private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx,
public RemotingCommand cleanExpiredConsumeQueue() {
LOGGER.info("AdminBrokerProcessor#cleanExpiredConsumeQueue: start.");
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
brokerController.getMessageStore().cleanExpiredConsumerQueue();
try {
brokerController.getMessageStore().cleanExpiredConsumerQueue();
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
LOGGER.info("AdminBrokerProcessor#cleanExpiredConsumeQueue: end.");
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down Expand Up @@ -2781,7 +2789,11 @@ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,

final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
if (replicasManager != null) {
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
try {
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
} catch (Exception e) {
throw new RemotingCommandException(e.getMessage());
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void ackOrigin(final ChangeInvisibleTimeRequestHeader requestHeader, Str
}

msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(rqId);
msgInner.setTags(PopAckConstants.ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down Expand Up @@ -216,7 +216,7 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader
ck.addDiff(0);
ck.setBrokerName(brokerName);

msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(reviveQid);
msgInner.setTags(PopAckConstants.CK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI
ackMsg.setQueueId(point.getQueueId());
ackMsg.setPopTime(point.getPopTime());
msgInner.setTopic(popMessageProcessor.reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(pointWrapper.getReviveQueueId());
msgInner.setTags(PopAckConstants.ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down Expand Up @@ -673,7 +673,7 @@ private boolean putBatchAckToStore(final PopCheckPointWrapper pointWrapper, fina
batchAckMsg.setQueueId(point.getQueueId());
batchAckMsg.setPopTime(point.getPopTime());
msgInner.setTopic(popMessageProcessor.reviveTopic);
msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(pointWrapper.getReviveQueueId());
msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(reviveQid);
msgInner.setTags(PopAckConstants.CK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
for (MessageExt messageExt : messageExts) {
if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ck, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
Expand All @@ -371,7 +371,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
firstRt = point.getReviveTime();
}
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
Expand All @@ -395,7 +395,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
}
} else if (PopAckConstants.BATCH_ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={}, find batch ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
Expand Down
Loading
Loading