Skip to content

Commit

Permalink
[ISSUE apache#7064] [RIP-66-1] Support KV(RocksDB) Storage for Metada…
Browse files Browse the repository at this point in the history
…ta (apache#7092)

* typo int readme[ecosystem]

* rocksdb metadata

* add unit test

* fix testOffsetPersistInMemory

* fix unit test

* fix unit test

* remove unused import

* move RocksDBOffsetSerialize to broker moudle

* Fix bazel build scripts

Signed-off-by: Li Zhanhui <[email protected]>

* Flag QueryMsgByKeyIT as flaky as it fails at frequency: 5 out of 32

Signed-off-by: Li Zhanhui <[email protected]>

* change public to private of some inner method

---------

Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
  • Loading branch information
fujian-zfj and lizhanhui authored Aug 4, 2023
1 parent c73d8ee commit 3a6ef04
Show file tree
Hide file tree
Showing 35 changed files with 2,473 additions and 86 deletions.
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ maven_install(
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
],
fetch_sources = True,
repositories = [
Expand Down
3 changes: 3 additions & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:io_github_aliyunmq_rocketmq_rocksdb",
"@maven//:net_java_dev_jna_jna",
],
)

Expand Down Expand Up @@ -81,6 +83,7 @@ java_library(
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:org_powermock_powermock_core",
"@maven//:io_opentelemetry_opentelemetry_api",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
Expand All @@ -66,8 +68,12 @@
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
Expand Down Expand Up @@ -120,6 +126,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
Expand Down Expand Up @@ -301,9 +308,16 @@ public BrokerController(
this.messageStoreConfig = messageStoreConfig;
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
if (isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
} else {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
}
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.peekMessageProcessor = new PeekMessageProcessor(this);
Expand All @@ -324,7 +338,6 @@ public BrokerController(
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.scheduleMessageService = new ScheduleMessageService(this);
this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this);
this.coldDataCgCtrService = new ColdDataCgCtrService(this);
Expand Down Expand Up @@ -1383,8 +1396,6 @@ protected void shutdownBasicService() {
this.adminBrokerExecutor.shutdown();
}

this.consumerOffsetManager.persist();

if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
Expand Down Expand Up @@ -1449,8 +1460,20 @@ protected void shutdownBasicService() {
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);

this.topicConfigManager.persist();
this.subscriptionGroupManager.persist();
if (this.topicConfigManager != null) {
this.topicConfigManager.persist();
this.topicConfigManager.stop();
}

if (this.subscriptionGroupManager != null) {
this.subscriptionGroupManager.persist();
this.subscriptionGroupManager.stop();
}

if (this.consumerOffsetManager != null) {
this.consumerOffsetManager.persist();
this.consumerOffsetManager.stop();
}

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
Expand Down Expand Up @@ -2375,4 +2398,8 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public boolean isEnableRocksDBStore() {
return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.broker.offset;

import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -26,6 +25,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Strings;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
Expand All @@ -37,12 +39,12 @@
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class ConsumerOffsetManager extends ConfigManager {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";

private DataVersion dataVersion = new DataVersion();

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);

private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
Expand All @@ -62,6 +64,10 @@ public ConsumerOffsetManager(BrokerController brokerController) {
this.brokerController = brokerController;
}

protected void removeConsumerOffset(String topicAtGroup) {

}

public void cleanOffset(String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand All @@ -71,6 +77,7 @@ public void cleanOffset(String group) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue());
}
}
Expand All @@ -86,6 +93,7 @@ public void cleanOffsetByTopic(String topic) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && topic.equals(arrays[0])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue());
}
}
Expand All @@ -105,6 +113,7 @@ public void scanUnsubscribedTopic() {
if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
&& this.offsetBehindMuchThanData(topic, next.getValue())) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("remove topic offset, {}", topicAtGroup);
}
}
Expand Down Expand Up @@ -313,8 +322,10 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
for (String group : filterGroups.split(",")) {
Iterator<String> it = topicGroups.iterator();
while (it.hasNext()) {
if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
String topicAtGroup = it.next();
if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
}
}
}
Expand Down Expand Up @@ -371,6 +382,7 @@ public void removeOffset(final String group) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("clean group offset {}", topicAtGroup);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
}

@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
}

@Override
protected void removeConsumerOffset(String topicAtGroup) {
try {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.charset);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

@Override
public String configFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}

@Override
public synchronized void persist() {
WriteBatch writeBatch = new WriteBatch();
try {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> iterator = this.offsetTable.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> entry = iterator.next();
putWriteBatch(writeBatch, entry.getKey(), entry.getValue());

if (writeBatch.getDataSize() >= 4 * 1024) {
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
}
}
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
this.rocksDBConfigManager.flushWAL();
} catch (Exception e) {
LOG.error("consumer offset persist Failed", e);
} finally {
writeBatch.close();
}
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}
}
Loading

0 comments on commit 3a6ef04

Please sign in to comment.