Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-28] light message queue(LMQ) #3694

Merged
merged 16 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
Expand All @@ -64,7 +66,9 @@
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
Expand Down Expand Up @@ -106,6 +110,7 @@
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.stats.LmqBrokerStatsManager;

public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -180,18 +185,18 @@ public BrokerController(
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);

Expand All @@ -207,7 +212,8 @@ public BrokerController(
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());

this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

this.brokerFastFailure = new BrokerFastFailure(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}

public static String getLmqConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "lmqConsumerOffset.json";
}

public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.longpolling;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;


public class LmqPullRequestHoldService extends PullRequestHoldService {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

public LmqPullRequestHoldService(BrokerController brokerController) {
super(brokerController);
}

@Override
public String getServiceName() {
return LmqPullRequestHoldService.class.getSimpleName();
}

@Override
public void checkHoldRequest() {
for (String key : pullRequestTable.keySet()) {
int idx = key.lastIndexOf(TOPIC_QUEUEID_SEPARATOR);
if (idx <= 0 || idx >= key.length() - 1) {
pullRequestTable.remove(key);
continue;
}
String topic = key.substring(0, idx);
int queueId = Integer.parseInt(key.substring(idx + 1));
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
LOGGER.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
if (MixAll.isLmq(topic)) {
ManyPullRequest mpr = pullRequestTable.get(key);
if (mpr == null || mpr.getPullRequestList() == null || mpr.getPullRequestList().isEmpty()) {
pullRequestTable.remove(key);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ public synchronized List<PullRequest> cloneListAndClear() {

return null;
}

public ArrayList<PullRequest> getPullRequestList() {
return pullRequestList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

public class PullRequestHoldService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_QUEUEID_SEPARATOR = "@";
private final BrokerController brokerController;
protected static final String TOPIC_QUEUEID_SEPARATOR = "@";
protected final BrokerController brokerController;
private final SystemClock systemClock = new SystemClock();
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);

public PullRequestHoldService(final BrokerController brokerController) {
Expand Down Expand Up @@ -93,7 +93,7 @@ public String getServiceName() {
return PullRequestHoldService.class.getSimpleName();
}

private void checkHoldRequest() {
protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@

public class ConsumerOffsetManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
protected static final String TOPIC_GROUP_SEPARATOR = "@";

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

private transient BrokerController brokerController;
protected transient BrokerController brokerController;

public ConsumerOffsetManager() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);

public LmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}

@Override
public long queryOffset(final String group, final String topic, final int queueId) {
if (!MixAll.isLmq(group)) {
return super.queryOffset(group, topic, queueId);
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
Long offset = lmqOffsetTable.get(key);
if (offset != null) {
return offset;
}
return -1;
}

@Override
public Map<Integer, Long> queryOffset(final String group, final String topic) {
if (!MixAll.isLmq(group)) {
return super.queryOffset(group, topic);
}
Map<Integer, Long> map = new HashMap<>();
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
Long offset = lmqOffsetTable.get(key);
if (offset != null) {
map.put(0, offset);
}
return map;
}

@Override
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
if (!MixAll.isLmq(group)) {
super.commitOffset(clientHost, group, topic, queueId, offset);
return;
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
lmqOffsetTable.put(key, offset);
}

@Override
public String encode() {
return this.encode(false);
}

@Override
public String configFilePath() {
return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir());
}

@Override
public void decode(String jsonString) {
if (jsonString != null) {
LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
if (obj != null) {
super.offsetTable = obj.offsetTable;
this.lmqOffsetTable = obj.lmqOffsetTable;
}
}
}

@Override
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}

public ConcurrentHashMap<String, Long> getLmqOffsetTable() {
return lmqOffsetTable;
}

public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) {
this.lmqOffsetTable = lmqOffsetTable;
}
}
Loading