Skip to content

Commit

Permalink
Merge branch 'apache:develop' into fix_7464
Browse files Browse the repository at this point in the history
  • Loading branch information
joeCarf committed Oct 16, 2023
2 parents 4edd823 + 0f01df4 commit df40306
Show file tree
Hide file tree
Showing 109 changed files with 6,442 additions and 1,416 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ $ java -version
java version "1.8.0_121"
```

For Windows users, click [here](https://dist.apache.org/repos/dist/release/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip) to download the 5.1.3 RocketMQ binary release,
For Windows users, click [here](https://dist.apache.org/repos/dist/release/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip) to download the 5.1.4 RocketMQ binary release,
unpack it to your local disk, such as `D:\rocketmq`.
For macOS and Linux users, execute following commands:

```shell
# Download release from the Apache mirror
$ wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip
$ wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip

# Unpack the release
$ unzip rocketmq-all-5.1.3-bin-release.zip
$ unzip rocketmq-all-5.1.4-bin-release.zip
```

Prepare a terminal and change to the extracted `bin` directory:
```shell
$ cd rocketmq-all-5.1.3-bin-release/bin
$ cd rocketmq-all-5.1.4-bin-release/bin
```

**1) Start NameServer**
Expand Down
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ maven_install(
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
"org.apache.rocketmq:rocketmq-rocksdb:1.0.2",
],
fetch_sources = True,
repositories = [
Expand Down
2 changes: 1 addition & 1 deletion broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:io_github_aliyunmq_rocketmq_rocksdb",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:net_java_dev_jna_jna",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,33 @@
*/
package org.apache.rocketmq.broker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand Down Expand Up @@ -126,7 +152,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
Expand All @@ -141,31 +167,6 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -308,7 +309,7 @@ public BrokerController(
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (isEnableRocksDBStore()) {
if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
Expand Down Expand Up @@ -747,7 +748,12 @@ public boolean initializeMetadata() {
public boolean initializeMessageStore() {
boolean result = true;
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
DefaultMessageStore defaultMessageStore;
if (this.messageStoreConfig.isEnableRocksDBStore()) {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler =
Expand Down Expand Up @@ -944,16 +950,16 @@ private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}",
TransactionalMessageServiceImpl.class.getSimpleName());
TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
AbstractTransactionalMessageCheckListener.class);
AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}",
DefaultTransactionalMessageCheckListener.class.getSimpleName());
DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
Expand Down Expand Up @@ -1807,7 +1813,7 @@ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {

if (shutdown) {
BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
BrokerController.LOG.info("BrokerController#doRegisterBrokerAll: broker has shutdown, no need to register any more.");
return;
}
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
Expand Down Expand Up @@ -2412,8 +2418,4 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public boolean isEnableRocksDBStore() {
return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void shutdown() {

public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress,
final Integer newMasterEpoch,
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
Expand All @@ -234,7 +234,7 @@ public synchronized void changeBrokerRole(final Long newMasterBrokerId, final St
}
}

public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval());
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
Expand All @@ -49,7 +49,7 @@ public boolean stop() {
@Override
protected void removeConsumerOffset(String topicAtGroup) {
try {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset);
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.CHARSET_UTF8);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
Expand All @@ -58,7 +58,7 @@ protected void removeConsumerOffset(String topicAtGroup) {

@Override
protected void decode0(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.charset);
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
Expand Down Expand Up @@ -93,7 +93,7 @@ public synchronized void persist() {
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset);
byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
Expand Down
Loading

0 comments on commit df40306

Please sign in to comment.